Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue package refactor #5

Draft
wants to merge 16 commits into
base: develop
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement changing reserve group.
sb10 committed Nov 11, 2020
commit 9b99105a1450e66cb6e146b5d102514a3dfd435d
2 changes: 1 addition & 1 deletion queue/heap.go
Original file line number Diff line number Diff line change
@@ -169,7 +169,7 @@ func (hq *HeapQueue) remove(item *Item) {
hq.mutex.Lock()
defer hq.mutex.Unlock()

if item.removed() { // || ! item.belongsTo(hq)
if item.removed() || !item.belongsTo(hq) {
return
}

8 changes: 8 additions & 0 deletions queue/item.go
Original file line number Diff line number Diff line change
@@ -158,6 +158,14 @@ func (item *Item) setSubqueue(sq subQueue) {
item.subQueue = sq
}

// belongsTo tells you if the item is set to the given subQueue.
func (item *Item) belongsTo(sq subQueue) bool {
item.mutex.RLock()
defer item.mutex.RUnlock()

return item.subQueue == sq
}

// index returns the index of this item in the subQueue it belongs to.
func (item *Item) index() int {
item.mutex.RLock()
18 changes: 15 additions & 3 deletions queue/queue.go
Original file line number Diff line number Diff line change
@@ -140,9 +140,9 @@ func (q *Queue) Get(key string) *Item {

// Reserve is a way to get the highest priority (or for those with equal
// priority, the largest, or for those with equal size, the oldest (by time
// since the item was first Add()ed) item in the ready sub-queue, switching it
// from the ready sub-queue to the run sub-queue, and in so doing starting its
// ttr countdown.
// since the item was first Add()ed) to the ready sub-queue, switching it from
// the ready sub-queue to the run sub-queue, and in so doing starting its ttr
// countdown.
//
// If the context is cancellable, we will wait until it is cancelled for an item
// to appear in the ready sub-queue, if at least 1 isn't already there. If after
@@ -170,3 +170,15 @@ func (q *Queue) Remove(key string) {
}
})
}

// ChangeReserveGroup changes the ReserveGroup of an item given its key, so that
// the next time it is Reserve()ed, you would have had to have supplied the
// given group to get it.
func (q *Queue) ChangeReserveGroup(key string, newGroup string) {
q.threadSafeItemsWriteOperation(func() {
if item, exists := q.items[key]; exists {
// *** item.doIfInState(StateReady)
q.readyQueues.changeItemReserveGroup(item, newGroup)
}
})
}
17 changes: 16 additions & 1 deletion queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -402,7 +402,7 @@ func TestQueueReserveWait(t *testing.T) {
func TestQueueReserveGroups(t *testing.T) {
num := 2
ips := newSetOfItemParameters(num)
rg1, rg2 := "1", "2"
rg1, rg2, rg3 := "1", "2", "3"
ips[0].ReserveGroup = rg1
ips[1].ReserveGroup = rg2

@@ -424,6 +424,21 @@ func TestQueueReserveGroups(t *testing.T) {
So(item, ShouldNotBeNil)
So(item.Key(), ShouldEqual, ips[0].Key)
})

Convey("You can change the group of an item and reserve it", func() {
q.ChangeReserveGroup(ips[0].Key, rg3)

item := q.Reserve(backgroundCtx, rg2)
So(item, ShouldNotBeNil)
So(item.Key(), ShouldEqual, ips[1].Key)

item = q.Reserve(backgroundCtx, rg1)
So(item, ShouldBeNil)

item = q.Reserve(backgroundCtx, rg3)
So(item, ShouldNotBeNil)
So(item.Key(), ShouldEqual, ips[0].Key)
})
})
}

21 changes: 21 additions & 0 deletions queue/ready.go
Original file line number Diff line number Diff line change
@@ -119,3 +119,24 @@ func (rq *readyQueues) remove(item *Item) {
defer rq.dropEmptyQueuesIfNotInUse()
hq.remove(item)
}

// changeItemReserveGroup atomically removes the item from one HeapQueue and
// pushes it to another.
func (rq *readyQueues) changeItemReserveGroup(item *Item, newGroup string) {
defer rq.dropEmptyQueuesIfNotInUse()
rq.mutex.Lock()
defer rq.mutex.Unlock()

oldGroup := item.ReserveGroup()
if oldGroup == newGroup {
return
}

hqOld := rq.reuseOrCreateHeapQueueForReserveGroup(oldGroup)
hqOld.remove(item)

hqNew := rq.reuseOrCreateHeapQueueForReserveGroup(newGroup)
item.SetReserveGroup(newGroup)
hqNew.push(item)
rq.inUse++
}
56 changes: 49 additions & 7 deletions queue/ready_test.go
Original file line number Diff line number Diff line change
@@ -37,18 +37,20 @@ import (
func TestQueueReady(t *testing.T) {
num := 2
ips := newSetOfItemParameters(num)
ipsSameRG := newSetOfItemParameters(num)
backgroundCtx := context.Background()

Convey("Given a readyQueues", t, func() {
rqs := newReadyQueues()
items := make([]*Item, num)

for i := 0; i < num; i++ {
ips[i].ReserveGroup = fmt.Sprintf("%d", i+1)
items[i] = ips[i].toItem()
}

Convey("Items with different ReserveGroups can be pushed to it simultaneously", func() {
items := make([]*Item, num)

for i := 0; i < num; i++ {
ips[i].ReserveGroup = fmt.Sprintf("%d", i+1)
items[i] = ips[i].toItem()
}

canDoConcurrently(num, func(i int) error {
rqs.push(items[i-1])

@@ -91,15 +93,55 @@ func TestQueueReady(t *testing.T) {
})

Convey("You can change the ReserveGroup of items", func() {
So(rqs.queues[ips[0].ReserveGroup].len(), ShouldEqual, 1)
So(rqs.queues[ips[1].ReserveGroup].len(), ShouldEqual, 1)

rqs.changeItemReserveGroup(items[0], ips[1].ReserveGroup)
So(len(rqs.queues), ShouldEqual, 1)
So(rqs.queues[ips[1].ReserveGroup].len(), ShouldEqual, 2)

rqs.changeItemReserveGroup(items[0], ips[1].ReserveGroup)
So(len(rqs.queues), ShouldEqual, 1)
So(rqs.queues[ips[1].ReserveGroup].len(), ShouldEqual, 2)
})

Convey("ReserveGroups can be changed simultaneously", func() {
newGroup := "foo"
canDoConcurrently(num, func(i int) error {
rqs.changeItemReserveGroup(items[i-1], newGroup)

return nil
})

So(len(rqs.queues), ShouldEqual, 1)
So(rqs.queues[newGroup].len(), ShouldEqual, 2)
item := rqs.pop(backgroundCtx, newGroup)
So(item, ShouldNotBeNil)
So(item, ShouldEqual, items[0])

var failed bool
for j := 0; j < 100; j++ {
j := j
canDoConcurrently(num, func(i int) error {
rqs.changeItemReserveGroup(items[1], fmt.Sprintf("%d.%d", j, i))

return nil
})

if len(rqs.queues) != 1 {
failed = true

break
}
}
So(failed, ShouldBeFalse)
})
})

Convey("Immediately after emptying a queue you can add items to it.", func() {
var failed bool

for j := 1; j < 10; j++ {
ipsSameRG := newSetOfItemParameters(num)
for i := 0; i < num; i++ {
rqs.push(ipsSameRG[i].toItem())
}