Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue package refactor #5

Draft
wants to merge 16 commits into
base: develop
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add run SubQueue, refactor ready to share code.
sb10 committed Nov 13, 2020
commit 52589a388f3ba97913615474aafd65de528ad996
117 changes: 46 additions & 71 deletions queue/heap.go
Original file line number Diff line number Diff line change
@@ -33,54 +33,53 @@ import (
"github.com/wtsi-ssg/wr/clog"
)

// HeapQueue holds Items in priority||size||age order. Use the private methods
// (which are thread safe), not the public ones that are there to implement
// heap.Interface.
type HeapQueue struct {
items []*Item
pushChs map[string]chan *Item
waitingPops []string
mutex sync.RWMutex
// heapQueue holds Items in a heap. It implements the SubQueue interface.
type heapQueue struct {
pushChs map[string]chan *Item
waitingPops []string
heapImplementation heap.Interface
mutex sync.RWMutex
}

// NewHeapQueue returns an initialised heap-based queue.
func NewHeapQueue() *HeapQueue {
hq := &HeapQueue{
pushChs: make(map[string]chan *Item),
// newHeapQueue returns an initialised heap-based queue.
func newHeapQueue(heapImplementation heap.Interface) *heapQueue {
hq := &heapQueue{
pushChs: make(map[string]chan *Item),
heapImplementation: heapImplementation,
}

heap.Init(hq)
heap.Init(hq.heapImplementation)

return hq
}

// push adds an item to the queue.
func (hq *HeapQueue) push(item *Item) {
func (hq *heapQueue) push(item *Item) {
hq.mutex.Lock()
defer hq.mutex.Unlock()
defer hq.popIfWaiting()

item.setSubqueue(hq)
heap.Push(hq, item)
heap.Push(hq.heapImplementation, item)
}

// popIfWaiting pops the item we just pushed and sends it down the next pushCh,
// if any exist.
//
// You must hold the mutex lock before calling this.
func (hq *HeapQueue) popIfWaiting() {
func (hq *heapQueue) popIfWaiting() {
pushCh := hq.getNextPushCh()
if pushCh == nil {
return
}
pushCh <- heap.Pop(hq).(*Item)
pushCh <- heap.Pop(hq.heapImplementation).(*Item)
}

// getNextPushCh finds the oldest waitingPops id that still has a pushChs
// entry, deletes the entry and returns the corresponding channel.
//
// You must hold the mutex lock before calling this.
func (hq *HeapQueue) getNextPushCh() chan *Item {
func (hq *heapQueue) getNextPushCh() chan *Item {
for {
if len(hq.waitingPops) == 0 {
return nil
@@ -97,16 +96,16 @@ func (hq *HeapQueue) getNextPushCh() chan *Item {
}
}

// pop removes and returns the highest priority||size||oldest item in the queue.
// pop removes and returns the next item in the queue.
//
// If there are currently no items in the queue, will wait for the context to be
// cancelled and return the next item push()ed to the queue before then, or if
// nothing gets pushed (or the context wasn't cancellable), nil.
func (hq *HeapQueue) pop(ctx context.Context) *Item {
func (hq *heapQueue) pop(ctx context.Context) *Item {
hq.mutex.Lock()

done := ctx.Done()
if hq.Len() == 0 {
if hq.heapImplementation.Len() == 0 {
if done == nil {
hq.mutex.Unlock()

@@ -126,13 +125,13 @@ func (hq *HeapQueue) pop(ctx context.Context) *Item {

defer hq.mutex.Unlock()

return heap.Pop(hq).(*Item)
return heap.Pop(hq.heapImplementation).(*Item)
}

// nextPushChannel returns a channel that will be sent the next item push()ed.
//
// You must hold the mutex lock before calling this.
func (hq *HeapQueue) nextPushChannel() (string, chan *Item) {
func (hq *heapQueue) nextPushChannel() (string, chan *Item) {
id := clog.UniqueID()
hq.waitingPops = append(hq.waitingPops, id)

@@ -148,7 +147,7 @@ func (hq *HeapQueue) nextPushChannel() (string, chan *Item) {
//
// Otherwise, we delete the id from pushChs, so that the next getNextPushCh()
// doesn't try and use this channel.
func (hq *HeapQueue) readFromPushChannelIfSentOn(id string, pushCh chan *Item) *Item {
func (hq *heapQueue) readFromPushChannelIfSentOn(id string, pushCh chan *Item) *Item {
hq.mutex.Lock()
if _, exists := hq.pushChs[id]; !exists {
hq.mutex.Unlock()
@@ -165,80 +164,56 @@ func (hq *HeapQueue) readFromPushChannelIfSentOn(id string, pushCh chan *Item) *
}

// remove removes a given item from the queue.
func (hq *HeapQueue) remove(item *Item) {
func (hq *heapQueue) remove(item *Item) {
hq.mutex.Lock()
defer hq.mutex.Unlock()

if item.removed() || !item.belongsTo(hq) {
return
}

heap.Remove(hq, item.index())
heap.Remove(hq.heapImplementation, item.index())
}

// update changes the item's position in the queue if its priority or size have
// changed. This implements the subQueue interface.
func (hq *HeapQueue) update(item *Item) {
// update changes the item's position in the queue if its order properties have
// changed.
func (hq *heapQueue) update(item *Item) {
hq.mutex.Lock()
defer hq.mutex.Unlock()
heap.Fix(hq, item.index())
heap.Fix(hq.heapImplementation, item.index())
}

// len returns the number of items in the queue.
func (hq *HeapQueue) len() int {
func (hq *heapQueue) len() int {
hq.mutex.RLock()
defer hq.mutex.RUnlock()

return hq.Len()
return hq.heapImplementation.Len()
}

// Len is to implement heap.Interface.
func (hq *HeapQueue) Len() int { return len(hq.items) }

// Less is to implement heap.Interface.
func (hq *HeapQueue) Less(i, j int) bool {
ip := hq.items[i].Priority()
jp := hq.items[j].Priority()

if ip == jp {
is := hq.items[i].Size()
js := hq.items[j].Size()

if is == js {
return hq.items[i].Created().Before(hq.items[j].Created())
}

return is > js
}

return ip > jp
}

// Swap is to implement heap.Interface.
func (hq *HeapQueue) Swap(i, j int) {
hq.items[i], hq.items[j] = hq.items[j], hq.items[i]
hq.items[i].setIndex(i)
hq.items[j].setIndex(j)
// heapSwap can be used to implement heap.Interface.Swap.
func heapSwap(items []*Item, i, j int) {
items[i], items[j] = items[j], items[i]
items[i].setIndex(i)
items[j].setIndex(j)
}

// Push is to implement heap.Interface.
func (hq *HeapQueue) Push(x interface{}) {
n := len(hq.items)
func heapPush(items []*Item, x interface{}) []*Item {
n := len(items)
item := x.(*Item)
item.setIndex(n)
hq.items = append(hq.items, item)
return append(items, item)
}

// Pop is to implement heap.Interface.
func (hq *HeapQueue) Pop() interface{} {
old := hq.items
n := len(old)
// heapPop can be used to implement heap.Interface.Pop.
func heapPop(items []*Item) ([]*Item, interface{}) {
n := len(items)

item := old[n-1]
old[n-1] = nil
hq.items = old[0 : n-1]
item := items[n-1]
items[n-1] = nil
new := items[0 : n-1]

item.remove()

return item
return new, item
}
302 changes: 148 additions & 154 deletions queue/heap_test.go

Large diffs are not rendered by default.

52 changes: 45 additions & 7 deletions queue/item.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,10 @@ import (
"time"
)

// DefaultTTR is the time to release used for items that were specified with a
// 0 TTR.
const DefaultTTR = 5 * time.Second

const indexOfRemovedItem = -1

// ItemParameters describe an item you want to add to the queue.
@@ -42,6 +46,7 @@ type ItemParameters struct {
Data interface{}
Priority uint8 // highest priority is 255
Size uint8
TTR time.Duration // if 0, defaults to DefaultTTR
}

// toItem creates an item based on our parameters.
@@ -53,6 +58,7 @@ func (ip *ItemParameters) toItem() *Item {
created: time.Now(),
priority: ip.Priority,
size: ip.Size,
ttr: ip.TTR,
}
}

@@ -65,7 +71,9 @@ type Item struct {
created time.Time
priority uint8
size uint8
subQueue subQueue
ttr time.Duration
releaseAt time.Time
subQueue SubQueue
subQueueIndex int
mutex sync.RWMutex
}
@@ -118,13 +126,13 @@ func (item *Item) Priority() uint8 {
return item.priority
}

// SetPriority sets a new priority for the item, and updates the subQueue it
// SetPriority sets a new priority for the item, and updates the SubQueue it
// belongs to.
func (item *Item) SetPriority(p uint8) {
item.setAndUpdate(&item.priority, p)
}

// setAndUpdate sets a property and updates the subQueue in a thread-safe way.
// setAndUpdate sets a property and updates the SubQueue in a thread-safe way.
func (item *Item) setAndUpdate(property *uint8, new uint8) {
item.mutex.Lock()
*property = new
@@ -146,20 +154,50 @@ func (item *Item) Size() uint8 {
return item.size
}

// SetSize sets a new size for the item, and updates the subQueue it belongs to.
// SetSize sets a new size for the item, and updates the SubQueue it belongs to.
func (item *Item) SetSize(s uint8) {
item.setAndUpdate(&item.size, s)
}

// setSubqueue sets a new subQueue for the item.
func (item *Item) setSubqueue(sq subQueue) {
// Touch updates the releaseAt for the item to now+TTR, and updates the
// SubQueue it belongs to.
func (item *Item) Touch() {
item.mutex.Lock()

var ttr time.Duration
if item.ttr == 0 {
ttr = DefaultTTR
}

item.releaseAt = time.Now().Add(ttr)
sq := item.subQueue
item.mutex.Unlock()

if sq == nil {
return
}

sq.update(item)
}

// ReleaseAt returns the time that this item's TTR will run out. It will be the
// zero time if this item has not yet been Touch()ed.
func (item *Item) ReleaseAt() time.Time {
item.mutex.RLock()
defer item.mutex.RUnlock()

return item.releaseAt
}

// setSubqueue sets a new SubQueue for the item.
func (item *Item) setSubqueue(sq SubQueue) {
item.mutex.Lock()
defer item.mutex.Unlock()
item.subQueue = sq
}

// belongsTo tells you if the item is set to the given subQueue.
func (item *Item) belongsTo(sq subQueue) bool {
func (item *Item) belongsTo(sq SubQueue) bool {
item.mutex.RLock()
defer item.mutex.RUnlock()

121 changes: 85 additions & 36 deletions queue/item_test.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
package queue

import (
"context"
"sync"
"testing"
"time"
@@ -35,12 +36,23 @@ import (

type mockSubQueue struct {
updates int
sync.Mutex
}

func (sq *mockSubQueue) update(item *Item) {
sq.Lock()
defer sq.Unlock()
sq.updates++
}

func (sq *mockSubQueue) push(*Item) {}

func (sq *mockSubQueue) pop(context.Context) *Item { return nil }

func (sq *mockSubQueue) remove(*Item) {}

func (sq *mockSubQueue) len() int { return 0 }

func TestQueueItem(t *testing.T) {
Convey("You can make items from ItemParameters", t, func() {
before := time.Now()
@@ -59,6 +71,8 @@ func TestQueueItem(t *testing.T) {
So(item.Created(), ShouldHappenAfter, before)
So(item.Priority(), ShouldEqual, 0)
So(item.Size(), ShouldEqual, 0)
So(item.ttr, ShouldEqual, 0)
So(item.releaseAt, ShouldResemble, time.Time{})

p, s := uint8(5), uint8(3)
ip = &ItemParameters{
@@ -87,8 +101,9 @@ func TestQueueItem(t *testing.T) {
sq := &mockSubQueue{}
item.setSubqueue(sq)
So(item.subQueue, ShouldEqual, sq)
So(item.belongsTo(sq), ShouldBeTrue)

Convey("When you set priority or size, the subQueue is updated", func() {
Convey("When you set priority or size or Touch(), the subQueue is updated", func() {
new := uint8(10)
item.SetPriority(new)
So(item.Priority(), ShouldEqual, new)
@@ -97,10 +112,18 @@ func TestQueueItem(t *testing.T) {
item.SetSize(new)
So(item.Size(), ShouldEqual, new)
So(sq.updates, ShouldEqual, 2)

item.Touch()
t := time.Now()
So(item.releaseAt, ShouldHappenBetween, t, t.Add(DefaultTTR))
So(sq.updates, ShouldEqual, 3)
})

Convey("And then you can remove() it", func() {
remove()
So(item.subQueue, ShouldBeNil)
So(item.belongsTo(sq), ShouldBeFalse)
So(item.index(), ShouldEqual, indexOfRemovedItem)
})
})

@@ -116,50 +139,76 @@ func TestQueueItem(t *testing.T) {

Convey("You can set and get item properties simultaneously", func() {
sq := &mockSubQueue{}
newVal := uint8(10)

var wg sync.WaitGroup
wg.Add(5)

change := func(p, s uint8, index int, data interface{}) {
defer wg.Done()
item.SetReserveGroup("rg")
canDoInPairsConcurrently(func() {
item.setSubqueue(sq)
item.SetPriority(p)
item.SetSize(s)
item.remove()
item.setIndex(index)
item.SetData(data)
}

read := func() {
defer wg.Done()
item.Key()
item.ReserveGroup()
item.Created()
item.setIndex(int(newVal))
item.SetPriority(newVal)
item.SetSize(newVal)
item.SetData("foo")
}, func() {
item.belongsTo(sq)
item.index()
item.Priority()
item.Size()
item.removed()
item.index()
item.Data()
}

go change(10, 10, 10, "foo")
go read()
go change(11, 11, 11, "bar")
go func() {
defer wg.Done()
item.setIndex(12)
}()
go read()
wg.Wait()

So(item.Size(), ShouldBeBetweenOrEqual, 10, 11)
So(item.Priority(), ShouldBeBetweenOrEqual, 10, 11)
So(item.index(), ShouldBeBetweenOrEqual, 10, 12)
item.Key()
item.Created()
})

canDoInPairsConcurrently(func() {
item.SetReserveGroup("rg")
}, func() {
item.ReserveGroup()
})

canDoInPairsConcurrently(item.Touch, func() {
item.ReleaseAt()
})

So(item.Size(), ShouldEqual, newVal)
So(item.Priority(), ShouldEqual, newVal)
So(item.index(), ShouldEqual, int(newVal))
So(item.Data(), ShouldNotEqual, data)
So(item.Key(), ShouldEqual, key)
So(item.ReserveGroup(), ShouldEqual, "rg")
So(item.ReleaseAt(), ShouldHappenAfter, time.Now())

canDoInPairsConcurrently(item.remove, func() {
item.removed()
})

So(item.removed(), ShouldBeTrue)

canDoInPairsConcurrently(func() {
item.remove()
item.setSubqueue(sq)
item.setIndex(int(newVal))
}, func() {
item.removed()
})

So(item.removed(), ShouldBeFalse)
})
})
}

func canDoInPairsConcurrently(f1 func(), f2 func()) {
var wg sync.WaitGroup

for i := 1; i <= 10; i++ {
wg.Add(2)

go func() {
defer wg.Done()
f1()
}()
go func() {
defer wg.Done()
f2()
}()
}

wg.Wait()
}
24 changes: 18 additions & 6 deletions queue/queue.go
Original file line number Diff line number Diff line change
@@ -63,16 +63,28 @@ import (
"sync"
)

// subQueue is something that an Item belongs to, and that can update the Item's
// position in itself when update() is called.
type subQueue interface {
// SubQueue is something that an Item belongs to, which stores the item in a
// certain order for later retrieval.
type SubQueue interface {
// push adds an item to the queue.
push(*Item)

// pop removes and returns an item in the queue based on a certain order.
pop(context.Context) *Item

// remove removes a given item from the queue.
remove(*Item)

// update changes the item's position in the queue if relevant item
// properties have changed.
update(*Item)

// len returns the number of items in the queue.
len() int
}

// Queue is an in-memory poll-free heap-based priorty queue with various
// sub-queues for managing item progress.
// Queue is an in-memory poll-free queue with various heap-based ordered
// SubQueues for managing item progress.
type Queue struct {
items map[string]*Item
itemsMutex sync.RWMutex
@@ -177,7 +189,7 @@ func (q *Queue) Remove(key string) {
func (q *Queue) ChangeReserveGroup(key string, newGroup string) {
q.threadSafeItemsWriteOperation(func() {
if item, exists := q.items[key]; exists {
// *** item.doIfInState(StateReady)
// *** item.doIfInState(ItemStateReady)
q.readyQueues.changeItemReserveGroup(item, newGroup)
}
})
121 changes: 86 additions & 35 deletions queue/ready.go
Original file line number Diff line number Diff line change
@@ -30,72 +30,123 @@ import (
"sync"
)

// readyQueues is a slice of *HeapQueue that will newly create or reuse a
// HeapQueue for each item.reserveGroup push()ed to it.
// prioritySizeAgeOrder implements heap.Interface, keeping items in
// priority||size||age order.
type prioritySizeAgeOrder struct {
items []*Item
}

// newReadySubQueue creates a *heapQueue that is ordered by priority||size||age.
func newReadySubQueue() SubQueue {
return newHeapQueue(&prioritySizeAgeOrder{})
}

// Len is to implement heap.Interface.
func (po *prioritySizeAgeOrder) Len() int { return len(po.items) }

// Less is to implement heap.Interface.
func (po *prioritySizeAgeOrder) Less(i, j int) bool {
ip := po.items[i].Priority()
jp := po.items[j].Priority()

if ip == jp {
is := po.items[i].Size()
js := po.items[j].Size()

if is == js {
return po.items[i].Created().Before(po.items[j].Created())
}

return is > js
}

return ip > jp
}

// Swap is to implement heap.Interface.
func (po *prioritySizeAgeOrder) Swap(i, j int) {
heapSwap(po.items, i, j)
}

// Push is to implement heap.Interface.
func (po *prioritySizeAgeOrder) Push(x interface{}) {
po.items = heapPush(po.items, x)
}

// Pop is to implement heap.Interface.
func (po *prioritySizeAgeOrder) Pop() interface{} {
var item interface{}
po.items, item = heapPop(po.items)

return item
}

// readyQueues is a slice of ready SubQueue that will newly create or reuse a
// SubQueue for each item.reserveGroup push()ed to it.
type readyQueues struct {
queues map[string]*HeapQueue
queues map[string]SubQueue
inUse int
mutex sync.Mutex
}

// newReadyQueues creates a readyQueues.
func newReadyQueues() *readyQueues {
return &readyQueues{
queues: make(map[string]*HeapQueue),
queues: make(map[string]SubQueue),
}
}

// push will newly create or reuse a *HeapQueue for the item's reserveGroup and
// push the item to that HeapQueue.
// push will newly create or reuse a SubQueue for the item's reserveGroup and
// push the item to that SubQueue.
func (rq *readyQueues) push(item *Item) {
rq.mutex.Lock()
defer rq.mutex.Unlock()

hq := rq.reuseOrCreateHeapQueueForReserveGroup(item.ReserveGroup())
hq.push(item)
sq := rq.reuseOrCreateReadyQueueForReserveGroup(item.ReserveGroup())
sq.push(item)
}

// reuseOrCreateHeapQueueForReserveGroup creates a new HeapQueue for the given
// reserveGroup, or returns any existing one.
// reuseOrCreateReadyQueueForReserveGroup creates a new ready SubQueue for the
// given reserveGroup, or returns any existing one.
//
// You must hold the mutex lock before calling this.
func (rq *readyQueues) reuseOrCreateHeapQueueForReserveGroup(reserveGroup string) *HeapQueue {
func (rq *readyQueues) reuseOrCreateReadyQueueForReserveGroup(reserveGroup string) SubQueue {
var (
hq *HeapQueue
sq SubQueue
found bool
)

if hq, found = rq.queues[reserveGroup]; !found {
hq = NewHeapQueue()
rq.queues[reserveGroup] = hq
if sq, found = rq.queues[reserveGroup]; !found {
sq = newReadySubQueue()
rq.queues[reserveGroup] = sq
}

return hq
return sq
}

// pop will pop the next Item from the HeapQueue corresponding to the given
// pop will pop the next Item from the SubQueue corresponding to the given
// group.
func (rq *readyQueues) pop(ctx context.Context, reserveGroup string) *Item {
hq := rq.getHeapQueueForUse(reserveGroup)
sq := rq.getHeapQueueForUse(reserveGroup)
defer rq.dropEmptyQueuesIfNotInUse()

return hq.pop(ctx)
return sq.pop(ctx)
}

// getHeapQueueForUse calls reuseOrCreateHeapQueueForReserveGroup() and
// getHeapQueueForUse calls reuseOrCreateReadyQueueForReserveGroup() and
// increments inUse. To be used in a paired call with
// dropEmptyQueuesIfNotInUse().
func (rq *readyQueues) getHeapQueueForUse(reserveGroup string) *HeapQueue {
func (rq *readyQueues) getHeapQueueForUse(reserveGroup string) SubQueue {
rq.mutex.Lock()
hq := rq.reuseOrCreateHeapQueueForReserveGroup(reserveGroup)
sq := rq.reuseOrCreateReadyQueueForReserveGroup(reserveGroup)
rq.inUse++
rq.mutex.Unlock()

return hq
return sq
}

// dropEmptyQueuesIfNotInUse removes empty HeapQueues from our map if they
// are empty and we are not in the middle of using any queues.
// dropEmptyQueuesIfNotInUse removes empty SubQueues from our map if we are not
// in the middle of using any SubQueues.
func (rq *readyQueues) dropEmptyQueuesIfNotInUse() {
rq.mutex.Lock()
defer rq.mutex.Unlock()
@@ -105,22 +156,22 @@ func (rq *readyQueues) dropEmptyQueuesIfNotInUse() {
return
}

for reserveGroup, hq := range rq.queues {
if hq.len() == 0 {
for reserveGroup, sq := range rq.queues {
if sq.len() == 0 {
delete(rq.queues, reserveGroup)
}
}
}

// remove will remove the given Item from the HeapQueue corresponding to the
// remove will remove the given Item from the SubQueue corresponding to the
// item's reserveGroup.
func (rq *readyQueues) remove(item *Item) {
hq := rq.getHeapQueueForUse(item.ReserveGroup())
sq := rq.getHeapQueueForUse(item.ReserveGroup())
defer rq.dropEmptyQueuesIfNotInUse()
hq.remove(item)
sq.remove(item)
}

// changeItemReserveGroup atomically removes the item from one HeapQueue and
// changeItemReserveGroup atomically removes the item from one SubQueue and
// pushes it to another.
func (rq *readyQueues) changeItemReserveGroup(item *Item, newGroup string) {
defer rq.dropEmptyQueuesIfNotInUse()
@@ -132,11 +183,11 @@ func (rq *readyQueues) changeItemReserveGroup(item *Item, newGroup string) {
return
}

hqOld := rq.reuseOrCreateHeapQueueForReserveGroup(oldGroup)
hqOld.remove(item)
sqOld := rq.reuseOrCreateReadyQueueForReserveGroup(oldGroup)
sqOld.remove(item)

hqNew := rq.reuseOrCreateHeapQueueForReserveGroup(newGroup)
sqNew := rq.reuseOrCreateReadyQueueForReserveGroup(newGroup)
item.SetReserveGroup(newGroup)
hqNew.push(item)
sqNew.push(item)
rq.inUse++
}
113 changes: 113 additions & 0 deletions queue/ready_test.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,119 @@ import (
. "github.com/smartystreets/goconvey/convey"
)

func TestQueueReadyPushPop(t *testing.T) {
num := 6
ips := newSetOfItemParameters(num)
ctx := context.Background()

Convey("Given a ready SubQueue", t, func() {
sq := newReadySubQueue()

Convey("You can push() equal priority items in to it", func() {
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {})

Convey("And then pop() them out in FIFO order", func() {
testPopsInInsertionOrder(ctx, sq, num, ips)
})
})

testPopsReverseOrder := func() {
popItemsFromSubQueue(sq, num, func(key string, i int) {
So(key, ShouldEqual, ips[num-1-i].Key)
})
}

Convey("You can push() different priority items in to it", func() {
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {
item.priority = uint8(i)
})

Convey("And then pop() them out in priority order", func() {
testPopsReverseOrder()
})
})

Convey("You can push() different size items in to it", func() {
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {
item.size = uint8(i)
})

Convey("And then pop() them out in size order", func() {
testPopsReverseOrder()
})
})

Convey("Priority has precedence over size which has precedence over age", func() {
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {
switch i {
case 3:
item.priority = uint8(3)
item.size = uint8(4)
case 4:
item.priority = uint8(3)
item.size = uint8(5)
}
})

popItemsFromSubQueue(sq, num, func(key string, i int) {
switch i {
case 0:
So(key, ShouldEqual, ips[4].Key)
case 1:
So(key, ShouldEqual, ips[3].Key)
case 2, 3, 4:
So(key, ShouldEqual, ips[i-2].Key)
default:
So(key, ShouldEqual, ips[i].Key)
}
})
})
})
}

func TestQueueReadyUpdate(t *testing.T) {
num := 6
ips := newSetOfItemParameters(num)
for i := 0; i < num; i++ {
ips[i].Priority = 5
ips[i].Size = 5
}
ctx := context.Background()

Convey("Given a ready SubQueue with some items push()ed to it", t, func() {
sq := newReadySubQueue()
items := make([]*Item, num)
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {
items[i] = item
})
So(sq.len(), ShouldEqual, num)

update2p := func() {
items[2].SetPriority(1)
}
update2s := func() {
items[2].SetSize(2)
}

Convey("You can update() an item's priority, which changes pop() order", func() {
update2p()
So(sq.len(), ShouldEqual, num)
testPopsInAlteredOrder(ctx, sq, num, ips)
})

Convey("You can update() an item's size, which changes pop() order", func() {
update2s()
So(sq.len(), ShouldEqual, num)
testPopsInAlteredOrder(ctx, sq, num, ips)
})

Convey("You can do simultaneous update()s", func() {
testSimultaneousUpdates(update2p, update2s)
testPopsInAlteredOrder(ctx, sq, num, ips)
})
})
}

func TestQueueReady(t *testing.T) {
num := 2
ips := newSetOfItemParameters(num)
62 changes: 62 additions & 0 deletions queue/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*******************************************************************************
* Copyright (c) 2020 Genome Research Ltd.
*
* Author: Sendu Bala <sb10@sanger.ac.uk>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
******************************************************************************/

package queue

// releaseOrder implements heap.Interface, keeping items in releaseAt order.
type releaseOrder struct {
items []*Item
}

// newRunSubQueue creates a *heapQueue that is ordered by releaseAt.
func newRunSubQueue() SubQueue {
return newHeapQueue(&releaseOrder{})
}

// Len is to implement heap.Interface.
func (ro *releaseOrder) Len() int { return len(ro.items) }

// Less is to implement heap.Interface.
func (ro *releaseOrder) Less(i, j int) bool {
return ro.items[i].ReleaseAt().Before(ro.items[j].ReleaseAt())
}

// Swap is to implement heap.Interface.
func (ro *releaseOrder) Swap(i, j int) {
heapSwap(ro.items, i, j)
}

// Push is to implement heap.Interface.
func (ro *releaseOrder) Push(x interface{}) {
ro.items = heapPush(ro.items, x)
}

// Pop is to implement heap.Interface.
func (ro *releaseOrder) Pop() interface{} {
var item interface{}
ro.items, item = heapPop(ro.items)

return item
}
97 changes: 97 additions & 0 deletions queue/run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*******************************************************************************
* Copyright (c) 2020 Genome Research Ltd.
*
* Author: Sendu Bala <sb10@sanger.ac.uk>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
******************************************************************************/

package queue

import (
"context"
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"
)

func TestQueueRunPushPop(t *testing.T) {
num := 6
ips := newSetOfItemParameters(num)
ctx := context.Background()

Convey("Given a run SubQueue", t, func() {
sq := newRunSubQueue()

Convey("You can push() increasing releaseAt items in to it", func() {
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {
item.Touch()
})

Convey("And then pop() them out in releaseAt order", func() {
testPopsInInsertionOrder(ctx, sq, num, ips)
})
})

Convey("You can push() reversed releaseAt items in to it", func() {
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {
item.releaseAt = time.Now().Add(time.Duration(num-i) * time.Millisecond)
})

Convey("And then pop() them out in releaseAt order", func() {
popItemsFromSubQueue(sq, num, func(key string, i int) {
So(key, ShouldEqual, ips[num-1-i].Key)
})
})
})
})
}

func TestQueueRunUpdate(t *testing.T) {
num := 6
ips := newSetOfItemParameters(num)
ctx := context.Background()

Convey("Given a run SubQueue with some items push()ed to it", t, func() {
sq := newRunSubQueue()
items := make([]*Item, num)
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {
item.Touch()
items[i] = item
})
So(sq.len(), ShouldEqual, num)

update := func() {
items[2].Touch()
}

Convey("You can Touch() an item, which changes pop() order", func() {
update()
So(sq.len(), ShouldEqual, num)
testPopsInAlteredOrder(ctx, sq, num, ips)
})

Convey("You can do simultaneous update()s", func() {
testSimultaneousUpdates(update, update)
testPopsInAlteredOrder(ctx, sq, num, ips)
})
})
}