Skip to content

Commit

Permalink
Make TopKQueue thread-safe. Get rid of similarityHolder. (suggest-go#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
alldroll authored Dec 13, 2020
1 parent 7607005 commit c5af0db
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
25 changes: 2 additions & 23 deletions pkg/suggest/suggester.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,18 @@ func (n *nGramSuggester) Suggest(config SearchConfig) ([]Candidate, error) {
bMax = lenIndices - 1
}

// store similarity as atomic value
// we are going to update its value after search sub-work complete
similarityHolder := utils.AtomicFloat64{}
similarityHolder.Store(config.similarity)

topKQueue := topKQueuePool.Get().(TopKQueue)
topKQueue.Reset(config.topK)
defer topKQueuePool.Put(topKQueue)

// channel that receives fuzzyCollector and performs a search on length segment
sizeCh := make(chan int, bMax-bMin+1)
workerPool := errgroup.Group{}
lock := sync.Mutex{}

for i := 0; i < utils.Min(maxSearchQueriesAtOnce, bMax-bMin+1); i++ {
workerPool.Go(func() error {
for sizeB := range sizeCh {
similarity := similarityHolder.Load()
similarity := utils.MaxFloat64(config.similarity, topKQueue.GetLowestScore())
threshold := config.metric.Threshold(similarity, sizeA, sizeB)

// it means that the similarity has been changed and we will skip this value processing
Expand All @@ -88,29 +82,14 @@ func (n *nGramSuggester) Suggest(config SearchConfig) ([]Candidate, error) {
continue
}

queue := topKQueuePool.Get().(TopKQueue)
queue.Reset(config.topK)

collector := &fuzzyCollector{
topKQueue: queue,
topKQueue: topKQueue,
scorer: NewMetricScorer(config.metric, sizeA, sizeB),
}

if err := n.searcher.Search(invertedIndex, set, threshold, collector); err != nil {
return fmt.Errorf("failed to search posting lists: %w", err)
}

lock.Lock()

topKQueue.Merge(queue)

if topKQueue.IsFull() && similarityHolder.Load() < topKQueue.GetLowestScore() {
similarityHolder.Store(topKQueue.GetLowestScore())
}

lock.Unlock()

topKQueuePool.Put(queue)
}

return nil
Expand Down
37 changes: 33 additions & 4 deletions pkg/suggest/topk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package suggest
import (
"container/heap"
"math"
"sync"

"github.com/suggest-go/suggest/pkg/index"
)
Expand Down Expand Up @@ -66,13 +67,15 @@ func (h topKHeap) updateTop(candidate Candidate) {
type topKQueue struct {
topK int
h topKHeap
lock sync.RWMutex
}

// NewTopKQueue returns instance of TopKQueue
func NewTopKQueue(topK int) TopKQueue {
return &topKQueue{
topK: topK,
h: make(topKHeap, 0, topK),
lock: sync.RWMutex{},
}
}

Expand All @@ -89,19 +92,30 @@ func (c *topKQueue) Add(position index.Position, score float64) {
Score: score,
}

if c.h.Len() < c.topK {
if !c.IsFull() {
c.lock.Lock()
heap.Push(&c.h, candidate)
c.lock.Unlock()

return
}

if c.h.top().Less(candidate) {
c.lock.RLock()
isLess := c.h.top().Less(candidate)
c.lock.RUnlock()

if isLess {
c.lock.Lock()
c.h.updateTop(candidate)
c.lock.Unlock()
}
}

// GetLowestScore returns the lowest score of the collected candidates
func (c *topKQueue) GetLowestScore() float64 {
c.lock.RLock()
defer c.lock.RUnlock()

if c.h.Len() > 0 {
return c.h.top().Score
}
Expand All @@ -115,16 +129,27 @@ func (c *topKQueue) CanTakeWithScore(score float64) bool {
return true
}

return c.h.top().Score <= score
c.lock.RLock()
canTake := c.h.top().Score <= score
c.lock.RUnlock()

return canTake
}

// IsFull tells if selector has collected topK elements
func (c *topKQueue) IsFull() bool {
return c.h.Len() == c.topK
c.lock.RLock()
isFull := c.h.Len() == c.topK
c.lock.RUnlock()

return isFull
}

// GetCandidates returns `top k items`
func (c *topKQueue) GetCandidates() []Candidate {
c.lock.Lock()
defer c.lock.Unlock()

if c.h.Len() == 0 {
return []Candidate{}
}
Expand All @@ -151,10 +176,14 @@ func (c *topKQueue) Merge(other TopKQueue) {
topK, ok := other.(*topKQueue)

if ok {
c.lock.Lock()

for _, item := range topK.h {
c.Add(item.Key, item.Score)
}

c.lock.Unlock()

return
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ func Min(a, b int) int {
return a
}

// MaxFloat64 returns the maximum value
func MaxFloat64(a, b float64) float64 {
if a > b {
return a
}

return b
}

// Pack packs 2 uint32 into uint64
func Pack(a, b uint32) uint64 {
return (uint64(a) << 32) | uint64(b&math.MaxUint32)
Expand Down

0 comments on commit c5af0db

Please sign in to comment.