Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue package refactor #5

Draft
wants to merge 16 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ jobs:

- name: Run tests
run: make test

- name: Run race tests
run: make race
5 changes: 4 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ issues:
- path: _test\.go
linters:
- funlen
- path: clog.go
- path: clog/clog.go
linters:
- gochecknoinits
- path: queue/item.go
linters:
- gochecknoglobals
max-issues-per-linter: 0
max-same-issues: 0
new-from-rev: master
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ require (
github.com/go-stack/stack v1.8.0 // indirect
github.com/inconshreveable/log15 v0.0.0-20200109203555-b30bc20e4fd1
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/ricochet2200/go-disk-usage v0.0.0-20150921141558-f0d1b743428f
github.com/rs/xid v1.2.1
github.com/sasha-s/go-deadlock v0.2.0
github.com/sb10/l15h v0.0.0-20170510122137-64c488bf8e22
github.com/smartystreets/goconvey v1.6.4
golang.org/x/sys v0.0.0-20200918174421-af09f7315aff // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ github.com/mattn/go-colorable v0.1.7 h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/ricochet2200/go-disk-usage v0.0.0-20150921141558-f0d1b743428f h1:w4VLAgWDnrcBDFSi8Ppn/MrB/Z1A570+MV90CvMtVVA=
github.com/ricochet2200/go-disk-usage v0.0.0-20150921141558-f0d1b743428f/go.mod h1:yhevTRDiduxPJHQDCtlqUn53ojFPkRh/mKhMUzQUCpc=
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y=
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/sb10/l15h v0.0.0-20170510122137-64c488bf8e22 h1:1ECjRVBhG3NLRKTbvZ07fIQ5BiLnZFc3qLxqM6H6Rn8=
github.com/sb10/l15h v0.0.0-20170510122137-64c488bf8e22/go.mod h1:s4RlXXC/L+BTwtp3zv5UREYJOftKFBWLsUCILdaMYeU=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
Expand Down
55 changes: 55 additions & 0 deletions queue/delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*******************************************************************************
* Copyright (c) 2020 Genome Research Ltd.
*
* Author: Sendu Bala <sb10@sanger.ac.uk>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
******************************************************************************/

package queue

import (
"time"
)

// readyOrder implements heapWithNext, keeping items in readyAt order.
type readyOrder struct {
*basicHeapWithNext
}

func newReadyOrder() *readyOrder {
return &readyOrder{basicHeapWithNext: &basicHeapWithNext{}}
}

// Less is to implement heap.Interface.
func (ro *readyOrder) Less(i, j int) bool {
return ro.items[i].ReadyAt().Before(ro.items[j].ReadyAt())
}

// newDelaySubQueue creates a SubQueue that is ordered by readyAt and passes
// expired readyAt items to the given callback.
func newDelaySubQueue(cb itemExpirationCB) SubQueue {
return newExpireSubQueue(cb, getItemReady, newReadyOrder())
}

// getItemReady is run SubQueue's itemTimeCB.
func getItemReady(item *Item) time.Time {
return item.ReadyAt()
}
101 changes: 101 additions & 0 deletions queue/delay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*******************************************************************************
* Copyright (c) 2020 Genome Research Ltd.
*
* Author: Sendu Bala <sb10@sanger.ac.uk>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
******************************************************************************/

package queue

import (
"context"
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"
)

func TestQueueDelayPushPop(t *testing.T) {
num := 6
ips := newSetOfItemParameters(num)
ctx := context.Background()

Convey("Given a delay SubQueue", t, func() {
sq := newDelaySubQueue(func(*Item) (bool, chan struct{}) {
return true, make(chan struct{})
})

Convey("You can push() increasing readyAt items in to it", func() {
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {
item.restart()
})

Convey("And then pop() them out in readyAt order", func() {
testPopsInInsertionOrder(ctx, sq, num, ips)
})
})

SkipConvey("You can push() reversed readyAt items in to it", func() {
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {
item.readyAt = time.Now().Add(time.Duration(num-i) * time.Millisecond)
})

Convey("And then pop() them out in readyAt order", func() {
popItemsFromSubQueue(sq, num, func(key string, i int) {
So(key, ShouldEqual, ips[num-1-i].Key)
})
})
})
})
}

func TestQueueDelayUpdate(t *testing.T) {
num := 6
ips := newSetOfItemParameters(num)
ctx := context.Background()

SkipConvey("Given a delay SubQueue with some items push()ed to it", t, func() {
sq := newDelaySubQueue(func(*Item) (bool, chan struct{}) {
return true, make(chan struct{})
})
items := make([]*Item, num)
pushItemsToSubQueue(sq, ips, func(item *Item, i int) {
item.restart()
items[i] = item
})
So(sq.len(), ShouldEqual, num)

update := func() {
items[2].restart()
}

Convey("You can restart() an item, which changes pop() order", func() {
update()
So(sq.len(), ShouldEqual, num)
testPopsInAlteredOrder(ctx, sq, num, ips)
})

Convey("You can do simultaneous update()s", func() {
testSimultaneousUpdates(update, update)
testPopsInAlteredOrder(ctx, sq, num, ips)
})
})
}
185 changes: 185 additions & 0 deletions queue/expire.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*******************************************************************************
* Copyright (c) 2020 Genome Research Ltd.
*
* Author: Sendu Bala <sb10@sanger.ac.uk>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
******************************************************************************/

package queue

import (
"context"
"time"

sync "github.com/sasha-s/go-deadlock"
)

// itemExpirationCB is like ExpirationCB, but takes an item.
type itemExpirationCB func(*Item) (bool, chan struct{})

// itemTimeCB is something that will be called to find out if an item has
// expired.
type itemTimeCB func(*Item) time.Time

// expireSubQueue is a heapQueue that deals with items that are older than
// one of their time.Time properties by passing them to a callback.
type expireSubQueue struct {
*heapQueue
expireCB itemExpirationCB
timeCB itemTimeCB
expireTime time.Time
expireMutex sync.RWMutex
updateNextItemToExpire chan *Item
nextExpiringItem *Item
}

func newExpireSubQueue(expireCB itemExpirationCB, timeCB itemTimeCB, heapImplementation heapWithNext) SubQueue {
esq := &expireSubQueue{
expireCB: expireCB,
timeCB: timeCB,
expireTime: time.Now().Add(unsetItemExpiry),
updateNextItemToExpire: make(chan *Item),
}

esq.heapQueue = newHeapQueue(heapImplementation)

go esq.processExpiringItems()

return esq
}

// push adds an item to the queue.
func (esq *expireSubQueue) push(item *Item) {
esq.expireMutex.Lock()
esq.heapQueue.push(item)
esq.considerNextExpiringItem()
}

// considerNextExpiringItem will trigger processing of the next item that would
// be popped. You must hold the expireMutex lock before calling this.
func (esq *expireSubQueue) considerNextExpiringItem() {
item := esq.heapQueue.nextItem()
beingConsidered := esq.nextExpiringItem

if item == nil {
esq.expireMutex.Unlock()

return
}

if beingConsidered != nil {
if item.Key() == beingConsidered.Key() {
esq.expireMutex.Unlock()

return
}
}
esq.expireMutex.Unlock()

esq.updateNextItemToExpire <- item
}

// pop removes and returns the next item in the queue, waiting like heapQueue.
func (esq *expireSubQueue) pop(ctx context.Context) *Item {
esq.expireMutex.Lock()
item := esq.heapQueue.pop(ctx)
esq.considerNextExpiringItem()

return item
}

// remove removes a given item from the queue.
func (esq *expireSubQueue) remove(item *Item) {
esq.expireMutex.Lock()
esq.heapQueue.remove(item)
esq.considerNextExpiringItem()
}

// update changes the item's position in the queue if its order properties have
// changed.
func (esq *expireSubQueue) update(item *Item) {
esq.expireMutex.Lock()
esq.heapQueue.update(item)
esq.considerNextExpiringItem()
}

// processExpiringItems starts waiting for items to expire and calls our
// expireCB when they are.
func (esq *expireSubQueue) processExpiringItems() {
item := <-esq.updateNextItemToExpire

for {
itemExpires := esq.itemExpires(item)

select {
case <-itemExpires.C:
itemExpires.Stop()

item = esq.sendItemToExpireCB(item)
case item = <-esq.updateNextItemToExpire:
itemExpires.Stop()
}
}
}

// itemExpires returns a timer for when the given item is supposed to expire. If
// the item is nil, timer effectively does not go off.
func (esq *expireSubQueue) itemExpires(item *Item) *time.Timer {
esq.expireMutex.Lock()
defer esq.expireMutex.Unlock()

esq.expireTime = esq.timeCB(item)
esq.nextExpiringItem = item

return time.NewTimer(time.Until(esq.expireTime))
}

// sendItemToExpireCB sends non-nil items to our expireCB and returns the next
// item to expire.
func (esq *expireSubQueue) sendItemToExpireCB(item *Item) *Item {
if item == nil {
return nil
}

esq.expireMutex.Lock()
if item.removed() {
esq.expireMutex.Unlock()

return nil
}

remove, ch := esq.expireCB(item)

if remove {
esq.heapQueue.remove(item)
next := esq.heapQueue.nextItem()
esq.expireMutex.Unlock()
close(ch)
return next
}

// *** else reset expiration
close(ch)

esq.expireMutex.Unlock()

return nil
}
Loading