Skip to content

Commit

Permalink
Integrate CollectorManager into Suggest method. (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
alldroll authored Dec 27, 2020
1 parent 3c76c6c commit d771f3e
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 146 deletions.
38 changes: 24 additions & 14 deletions pkg/spellchecker/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package spellchecker

import (
"errors"

"github.com/suggest-go/suggest/pkg/lm"
"github.com/suggest-go/suggest/pkg/merger"
"github.com/suggest-go/suggest/pkg/suggest"
Expand All @@ -13,38 +15,46 @@ type lmCollector struct {
}

// newCollectorManager creates a new instance of lm CollectorManger.
func newCollectorManager(scorer suggest.Scorer, topK int) suggest.CollectorManager {
func newCollectorManager(scorer suggest.Scorer, queueFactory func() suggest.TopKQueue) suggest.CollectorManager {
return &lmCollectorManager{
topK: topK,
scorer: scorer,
scorer: scorer,
queueFactory: queueFactory,
globalQueue: queueFactory(),
}
}

// lmCollectorManager implements CollectorManager interface
type lmCollectorManager struct {
topK int
scorer suggest.Scorer
scorer suggest.Scorer
queueFactory func() suggest.TopKQueue
globalQueue suggest.TopKQueue
}

// Create creates a new collector that will be used for a search segment
func (l *lmCollectorManager) Create() (suggest.Collector, error) {
func (l *lmCollectorManager) Create() suggest.Collector {
return &lmCollector{
topKQueue: suggest.NewTopKQueue(l.topK),
topKQueue: l.queueFactory(),
scorer: l.scorer,
}, nil
}
}

// Reduce reduces the result from the given list of collectors
func (l *lmCollectorManager) Reduce(collectors []suggest.Collector) []suggest.Candidate {
topKQueue := suggest.NewTopKQueue(l.topK)

func (l *lmCollectorManager) Collect(collectors ...suggest.Collector) error {
for _, c := range collectors {
if collector, ok := c.(*lmCollector); ok {
topKQueue.Merge(collector.topKQueue)
collector, ok := c.(*lmCollector)

if !ok {
return errors.New("expected collector created by lmCollectorManager")
}

l.globalQueue.Merge(collector.topKQueue)
}

return topKQueue.GetCandidates()
return nil
}

func (l *lmCollectorManager) GetCandidates() []suggest.Candidate {
return l.globalQueue.GetCandidates()
}

// Collect collects the given candidate
Expand Down
23 changes: 12 additions & 11 deletions pkg/spellchecker/spellchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,32 @@ func (s *SpellChecker) Predict(query string, topK int, similarity float64) ([]st
return nil, err
}

collectorManager := newCollectorManager(newScorer(scorerNext), topK)
candidates, err := s.index.Autocomplete(word, collectorManager)
queueFactory := func() suggest.TopKQueue {
return suggest.NewTopKQueue(topK)
}

candidates, err := s.index.Autocomplete(word, func() suggest.CollectorManager {
return newCollectorManager(newScorer(scorerNext), queueFactory)
})

if err != nil {
return nil, err
}

if len(candidates) < topK {
config, err := suggest.NewSearchConfig(
fuzzyCandidates, err := s.index.Suggest(
word,
topK,
metric.CosineMetric(),
similarity,
metric.CosineMetric(),
func() suggest.CollectorManager {
return suggest.NewFuzzyCollectorManager(queueFactory)
},
)

if err != nil {
return nil, err
}

fuzzyCandidates, err := s.index.Suggest(config)

if err != nil {
return nil, err
}

candidates = merge(candidates, fuzzyCandidates)
}

Expand Down
27 changes: 15 additions & 12 deletions pkg/suggest/autocomplete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package suggest

import (
"fmt"
"sync"

"github.com/suggest-go/suggest/pkg/analysis"
"github.com/suggest-go/suggest/pkg/index"
Expand All @@ -12,7 +13,7 @@ import (
// for candidates search
type Autocomplete interface {
// Autocomplete returns candidates where the query string is a substring of each candidate
Autocomplete(query string, collectorManager CollectorManager) ([]Candidate, error)
Autocomplete(query string, factory CollectorManagerFactory) ([]Candidate, error)
}

// NewAutocomplete creates a new instance of Autocomplete
Expand All @@ -36,11 +37,12 @@ type nGramAutocomplete struct {
}

// Autocomplete returns candidates where the query string is a prefix of each candidate
func (n *nGramAutocomplete) Autocomplete(query string, collectorManager CollectorManager) ([]Candidate, error) {
func (n *nGramAutocomplete) Autocomplete(query string, factory CollectorManagerFactory) ([]Candidate, error) {
set := n.tokenizer.Tokenize(query)
lenSet := len(set)
collectors := []Collector{}
workerPool := errgroup.Group{}
collectorManager := factory()
locker := sync.Mutex{}

for size := lenSet; size < n.indices.Size(); size++ {
invertedIndex := n.indices.Get(size)
Expand All @@ -49,26 +51,27 @@ func (n *nGramAutocomplete) Autocomplete(query string, collectorManager Collecto
continue
}

collector, err := collectorManager.Create()

if err != nil {
return nil, fmt.Errorf("failed to create a collector: %w", err)
}
collector := collectorManager.Create()

workerPool.Go(func() error {
if err = n.searcher.Search(invertedIndex, set, lenSet, collector); err != nil {
if err := n.searcher.Search(invertedIndex, set, lenSet, collector); err != nil {
return fmt.Errorf("failed to search posting lists: %w", err)
}

locker.Lock()
defer locker.Unlock()

if err := collectorManager.Collect(collector); err != nil {
return err
}

return nil
})

collectors = append(collectors, collector)
}

if err := workerPool.Wait(); err != nil {
return nil, err
}

return collectorManager.Reduce(collectors), nil
return collectorManager.GetCandidates(), nil
}
136 changes: 99 additions & 37 deletions pkg/suggest/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package suggest

import (
"errors"
"math"

"github.com/suggest-go/suggest/pkg/index"
"github.com/suggest-go/suggest/pkg/merger"
)
Expand All @@ -27,18 +30,21 @@ type Collector interface {
merger.Collector
// SetScorer sets a scorer before collection starts
SetScorer(scorer Scorer)
// GetCandidates returns the list of collected candidates
GetCandidates() []Candidate
}

// CollectorManager is responsible for creating collectors and reducing them into the result set
type CollectorManager interface {
// Create creates a new collector that will be used for a search segment
Create() (Collector, error)
// Reduce reduces the result from the given list of collectors
Reduce(collectors []Collector) []Candidate
Create() Collector
// Collect returns back the given collectors.
Collect(collectors ...Collector) error
// GetCandidates returns currently collected candidates.
GetCandidates() []Candidate
}

// CollectorManagerFactory is a factory method for creating a new instance of CollectorManager.
type CollectorManagerFactory func() CollectorManager

type firstKCollector struct {
limit int
items []merger.MergeCandidate
Expand All @@ -51,7 +57,6 @@ func (c *firstKCollector) Collect(item merger.MergeCandidate) error {
}

c.items = append(c.items, item)

return nil
}

Expand All @@ -60,50 +65,53 @@ func (c *firstKCollector) SetScorer(scorer Scorer) {
return
}

// GetCandidates returns the list of collected candidates
func (c *firstKCollector) GetCandidates() []Candidate {
result := make([]Candidate, 0, len(c.items))

for _, item := range c.items {
result = append(result, Candidate{
Key: item.Position(),
})
// NewFirstKCollectorManager creates a new instance of CollectorManager with firstK collectors
func NewFirstKCollectorManager(limit int, queue TopKQueue) *FirstKCollectorManager {
return &FirstKCollectorManager{
limit: limit,
queue: queue,
}

return result
}

// NewFirstKCollectorManager creates a new instance of CollectorManager with firstK collectors
func NewFirstKCollectorManager(limit int) CollectorManager {
return &firstKCollectorManager{
limit: limit,
func newFirstKCollectorManager(limit int) CollectorManagerFactory {
return func() CollectorManager {
return NewFirstKCollectorManager(limit, NewTopKQueue(limit))
}
}

type firstKCollectorManager struct {
// FirstKCollectorManager represents first k collector manager.
type FirstKCollectorManager struct {
limit int
queue TopKQueue
}

// Create creates a new collector that will be used for a search segment
func (m *firstKCollectorManager) Create() (Collector, error) {
func (m *FirstKCollectorManager) Create() Collector {
return &firstKCollector{
limit: m.limit,
}, nil
}
}

// Reduce reduces the result from the given list of collectors
func (m *firstKCollectorManager) Reduce(collectors []Collector) []Candidate {
topKQueue := NewTopKQueue(m.limit)
// Collect returns back the given collectors.
func (m *FirstKCollectorManager) Collect(collectors ...Collector) error {
for _, item := range collectors {
collector, ok := item.(*firstKCollector)

if !ok {
return errors.New("expected Collector created by FirstKCollectorManager")
}

for _, c := range collectors {
if collector, ok := c.(*firstKCollector); ok {
for _, item := range collector.items {
topKQueue.Add(item.Position(), -float64(item.Position()))
}
for _, candidate := range collector.items {
m.queue.Add(candidate.Position(), -float64(candidate.Position()))
}
}

return topKQueue.GetCandidates()
return nil
}

// GetCandidates returns currently collected candidates.
func (m *FirstKCollectorManager) GetCandidates() []Candidate {
return m.queue.GetCandidates()
}

type fuzzyCollector struct {
Expand All @@ -119,11 +127,65 @@ func (c *fuzzyCollector) Collect(item merger.MergeCandidate) error {
return nil
}

// GetCandidates returns `top k items`
func (c *fuzzyCollector) GetCandidates() []Candidate {
return c.topKQueue.GetCandidates()
}

// Score returns the score of the given position
func (c *fuzzyCollector) SetScorer(scorer Scorer) {
c.scorer = scorer
}

// NewFuzzyCollectorManager creates a new instance of FuzzyCollectorManager.
func NewFuzzyCollectorManager(queueFactory func() TopKQueue) *FuzzyCollectorManager {
return &FuzzyCollectorManager{
queueFactory: queueFactory,
globalQueue: queueFactory(),
}
}

func newFuzzyCollectorManager(topK int) CollectorManagerFactory {
return func() CollectorManager {
return NewFuzzyCollectorManager(func() TopKQueue {
return NewTopKQueue(topK)
})
}
}

// FuzzyCollectorManager represents fuzzy collector manager.
type FuzzyCollectorManager struct {
queueFactory func() TopKQueue
globalQueue TopKQueue
}

// Create creates a new collector that will be used for a search segment
func (m *FuzzyCollectorManager) Create() Collector {
return &fuzzyCollector{
topKQueue: m.queueFactory(),
}
}

// Collect returns back the given collectors.
func (m *FuzzyCollectorManager) Collect(collectors ...Collector) error {
for _, item := range collectors {
collector, ok := item.(*fuzzyCollector)

if !ok {
return errors.New("expected Collector created by FirstKCollectorManager")
}

m.globalQueue.Merge(collector.topKQueue)
}

return nil
}

// GetCandidates returns currently collected candidates.
func (m *FuzzyCollectorManager) GetCandidates() []Candidate {
return m.globalQueue.GetCandidates()
}

// GetLowestScore returns the lowest collected score.
func (m *FuzzyCollectorManager) GetLowestScore() float64 {
if !m.globalQueue.IsFull() {
return math.Inf(-1)
}

return m.globalQueue.GetLowestScore()
}
Loading

0 comments on commit d771f3e

Please sign in to comment.