Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate CollectorManager into Suggest method. #62

Merged
merged 1 commit into from
Dec 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions pkg/spellchecker/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package spellchecker

import (
"errors"

"github.com/suggest-go/suggest/pkg/lm"
"github.com/suggest-go/suggest/pkg/merger"
"github.com/suggest-go/suggest/pkg/suggest"
Expand All @@ -13,38 +15,46 @@ type lmCollector struct {
}

// newCollectorManager creates a new instance of lm CollectorManger.
func newCollectorManager(scorer suggest.Scorer, topK int) suggest.CollectorManager {
func newCollectorManager(scorer suggest.Scorer, queueFactory func() suggest.TopKQueue) suggest.CollectorManager {
return &lmCollectorManager{
topK: topK,
scorer: scorer,
scorer: scorer,
queueFactory: queueFactory,
globalQueue: queueFactory(),
}
}

// lmCollectorManager implements CollectorManager interface
type lmCollectorManager struct {
topK int
scorer suggest.Scorer
scorer suggest.Scorer
queueFactory func() suggest.TopKQueue
globalQueue suggest.TopKQueue
}

// Create creates a new collector that will be used for a search segment
func (l *lmCollectorManager) Create() (suggest.Collector, error) {
func (l *lmCollectorManager) Create() suggest.Collector {
return &lmCollector{
topKQueue: suggest.NewTopKQueue(l.topK),
topKQueue: l.queueFactory(),
scorer: l.scorer,
}, nil
}
}

// Reduce reduces the result from the given list of collectors
func (l *lmCollectorManager) Reduce(collectors []suggest.Collector) []suggest.Candidate {
topKQueue := suggest.NewTopKQueue(l.topK)

func (l *lmCollectorManager) Collect(collectors ...suggest.Collector) error {
for _, c := range collectors {
if collector, ok := c.(*lmCollector); ok {
topKQueue.Merge(collector.topKQueue)
collector, ok := c.(*lmCollector)

if !ok {
return errors.New("expected collector created by lmCollectorManager")
}

l.globalQueue.Merge(collector.topKQueue)
}

return topKQueue.GetCandidates()
return nil
}

func (l *lmCollectorManager) GetCandidates() []suggest.Candidate {
return l.globalQueue.GetCandidates()
}

// Collect collects the given candidate
Expand Down
23 changes: 12 additions & 11 deletions pkg/spellchecker/spellchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,32 @@ func (s *SpellChecker) Predict(query string, topK int, similarity float64) ([]st
return nil, err
}

collectorManager := newCollectorManager(newScorer(scorerNext), topK)
candidates, err := s.index.Autocomplete(word, collectorManager)
queueFactory := func() suggest.TopKQueue {
return suggest.NewTopKQueue(topK)
}

candidates, err := s.index.Autocomplete(word, func() suggest.CollectorManager {
return newCollectorManager(newScorer(scorerNext), queueFactory)
})

if err != nil {
return nil, err
}

if len(candidates) < topK {
config, err := suggest.NewSearchConfig(
fuzzyCandidates, err := s.index.Suggest(
word,
topK,
metric.CosineMetric(),
similarity,
metric.CosineMetric(),
func() suggest.CollectorManager {
return suggest.NewFuzzyCollectorManager(queueFactory)
},
)

if err != nil {
return nil, err
}

fuzzyCandidates, err := s.index.Suggest(config)

if err != nil {
return nil, err
}

candidates = merge(candidates, fuzzyCandidates)
}

Expand Down
27 changes: 15 additions & 12 deletions pkg/suggest/autocomplete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package suggest

import (
"fmt"
"sync"

"github.com/suggest-go/suggest/pkg/analysis"
"github.com/suggest-go/suggest/pkg/index"
Expand All @@ -12,7 +13,7 @@ import (
// for candidates search
type Autocomplete interface {
// Autocomplete returns candidates where the query string is a substring of each candidate
Autocomplete(query string, collectorManager CollectorManager) ([]Candidate, error)
Autocomplete(query string, factory CollectorManagerFactory) ([]Candidate, error)
}

// NewAutocomplete creates a new instance of Autocomplete
Expand All @@ -36,11 +37,12 @@ type nGramAutocomplete struct {
}

// Autocomplete returns candidates where the query string is a prefix of each candidate
func (n *nGramAutocomplete) Autocomplete(query string, collectorManager CollectorManager) ([]Candidate, error) {
func (n *nGramAutocomplete) Autocomplete(query string, factory CollectorManagerFactory) ([]Candidate, error) {
set := n.tokenizer.Tokenize(query)
lenSet := len(set)
collectors := []Collector{}
workerPool := errgroup.Group{}
collectorManager := factory()
locker := sync.Mutex{}

for size := lenSet; size < n.indices.Size(); size++ {
invertedIndex := n.indices.Get(size)
Expand All @@ -49,26 +51,27 @@ func (n *nGramAutocomplete) Autocomplete(query string, collectorManager Collecto
continue
}

collector, err := collectorManager.Create()

if err != nil {
return nil, fmt.Errorf("failed to create a collector: %w", err)
}
collector := collectorManager.Create()

workerPool.Go(func() error {
if err = n.searcher.Search(invertedIndex, set, lenSet, collector); err != nil {
if err := n.searcher.Search(invertedIndex, set, lenSet, collector); err != nil {
return fmt.Errorf("failed to search posting lists: %w", err)
}

locker.Lock()
defer locker.Unlock()

if err := collectorManager.Collect(collector); err != nil {
return err
}

return nil
})

collectors = append(collectors, collector)
}

if err := workerPool.Wait(); err != nil {
return nil, err
}

return collectorManager.Reduce(collectors), nil
return collectorManager.GetCandidates(), nil
}
136 changes: 99 additions & 37 deletions pkg/suggest/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package suggest

import (
"errors"
"math"

"github.com/suggest-go/suggest/pkg/index"
"github.com/suggest-go/suggest/pkg/merger"
)
Expand All @@ -27,18 +30,21 @@ type Collector interface {
merger.Collector
// SetScorer sets a scorer before collection starts
SetScorer(scorer Scorer)
// GetCandidates returns the list of collected candidates
GetCandidates() []Candidate
}

// CollectorManager is responsible for creating collectors and reducing them into the result set
type CollectorManager interface {
// Create creates a new collector that will be used for a search segment
Create() (Collector, error)
// Reduce reduces the result from the given list of collectors
Reduce(collectors []Collector) []Candidate
Create() Collector
// Collect returns back the given collectors.
Collect(collectors ...Collector) error
// GetCandidates returns currently collected candidates.
GetCandidates() []Candidate
}

// CollectorManagerFactory is a factory method for creating a new instance of CollectorManager.
type CollectorManagerFactory func() CollectorManager

type firstKCollector struct {
limit int
items []merger.MergeCandidate
Expand All @@ -51,7 +57,6 @@ func (c *firstKCollector) Collect(item merger.MergeCandidate) error {
}

c.items = append(c.items, item)

return nil
}

Expand All @@ -60,50 +65,53 @@ func (c *firstKCollector) SetScorer(scorer Scorer) {
return
}

// GetCandidates returns the list of collected candidates
func (c *firstKCollector) GetCandidates() []Candidate {
result := make([]Candidate, 0, len(c.items))

for _, item := range c.items {
result = append(result, Candidate{
Key: item.Position(),
})
// NewFirstKCollectorManager creates a new instance of CollectorManager with firstK collectors
func NewFirstKCollectorManager(limit int, queue TopKQueue) *FirstKCollectorManager {
return &FirstKCollectorManager{
limit: limit,
queue: queue,
}

return result
}

// NewFirstKCollectorManager creates a new instance of CollectorManager with firstK collectors
func NewFirstKCollectorManager(limit int) CollectorManager {
return &firstKCollectorManager{
limit: limit,
func newFirstKCollectorManager(limit int) CollectorManagerFactory {
return func() CollectorManager {
return NewFirstKCollectorManager(limit, NewTopKQueue(limit))
}
}

type firstKCollectorManager struct {
// FirstKCollectorManager represents first k collector manager.
type FirstKCollectorManager struct {
limit int
queue TopKQueue
}

// Create creates a new collector that will be used for a search segment
func (m *firstKCollectorManager) Create() (Collector, error) {
func (m *FirstKCollectorManager) Create() Collector {
return &firstKCollector{
limit: m.limit,
}, nil
}
}

// Reduce reduces the result from the given list of collectors
func (m *firstKCollectorManager) Reduce(collectors []Collector) []Candidate {
topKQueue := NewTopKQueue(m.limit)
// Collect returns back the given collectors.
func (m *FirstKCollectorManager) Collect(collectors ...Collector) error {
for _, item := range collectors {
collector, ok := item.(*firstKCollector)

if !ok {
return errors.New("expected Collector created by FirstKCollectorManager")
}

for _, c := range collectors {
if collector, ok := c.(*firstKCollector); ok {
for _, item := range collector.items {
topKQueue.Add(item.Position(), -float64(item.Position()))
}
for _, candidate := range collector.items {
m.queue.Add(candidate.Position(), -float64(candidate.Position()))
}
}

return topKQueue.GetCandidates()
return nil
}

// GetCandidates returns currently collected candidates.
func (m *FirstKCollectorManager) GetCandidates() []Candidate {
return m.queue.GetCandidates()
}

type fuzzyCollector struct {
Expand All @@ -119,11 +127,65 @@ func (c *fuzzyCollector) Collect(item merger.MergeCandidate) error {
return nil
}

// GetCandidates returns `top k items`
func (c *fuzzyCollector) GetCandidates() []Candidate {
return c.topKQueue.GetCandidates()
}

// Score returns the score of the given position
func (c *fuzzyCollector) SetScorer(scorer Scorer) {
c.scorer = scorer
}

// NewFuzzyCollectorManager creates a new instance of FuzzyCollectorManager.
func NewFuzzyCollectorManager(queueFactory func() TopKQueue) *FuzzyCollectorManager {
return &FuzzyCollectorManager{
queueFactory: queueFactory,
globalQueue: queueFactory(),
}
}

func newFuzzyCollectorManager(topK int) CollectorManagerFactory {
return func() CollectorManager {
return NewFuzzyCollectorManager(func() TopKQueue {
return NewTopKQueue(topK)
})
}
}

// FuzzyCollectorManager represents fuzzy collector manager.
type FuzzyCollectorManager struct {
queueFactory func() TopKQueue
globalQueue TopKQueue
}

// Create creates a new collector that will be used for a search segment
func (m *FuzzyCollectorManager) Create() Collector {
return &fuzzyCollector{
topKQueue: m.queueFactory(),
}
}

// Collect returns back the given collectors.
func (m *FuzzyCollectorManager) Collect(collectors ...Collector) error {
for _, item := range collectors {
collector, ok := item.(*fuzzyCollector)

if !ok {
return errors.New("expected Collector created by FirstKCollectorManager")
}

m.globalQueue.Merge(collector.topKQueue)
}

return nil
}

// GetCandidates returns currently collected candidates.
func (m *FuzzyCollectorManager) GetCandidates() []Candidate {
return m.globalQueue.GetCandidates()
}

// GetLowestScore returns the lowest collected score.
func (m *FuzzyCollectorManager) GetLowestScore() float64 {
if !m.globalQueue.IsFull() {
return math.Inf(-1)
}

return m.globalQueue.GetLowestScore()
}
Loading