From 2bf14521e72acd8b0ff95772329348e66560afb7 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Wed, 11 Dec 2019 00:21:48 +0800 Subject: [PATCH 1/9] Refactor code indexer --- modules/indexer/code/bleve.go | 482 ++++++++++++++------- modules/indexer/code/bleve_test.go | 58 +++ modules/indexer/code/indexer.go | 110 +++-- modules/indexer/code/queue.go | 137 ++++++ modules/indexer/code/repo.go | 214 --------- modules/{search => indexer/code}/search.go | 7 +- routers/home.go | 8 +- routers/init.go | 2 +- routers/repo/search.go | 4 +- 9 files changed, 584 insertions(+), 438 deletions(-) create mode 100644 modules/indexer/code/queue.go rename modules/{search => indexer/code}/search.go (92%) diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index bb2fc5bc74327..2ae637b540063 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -5,11 +5,12 @@ package code import ( + "context" "fmt" "os" "strconv" "strings" - "time" + "sync" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" @@ -18,142 +19,72 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/analysis/analyzer/custom" + "github.com/blevesearch/bleve/analysis/token/lowercase" + "github.com/blevesearch/bleve/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/analysis/tokenizer/unicode" + "github.com/blevesearch/bleve/index/upsidedown" + "github.com/blevesearch/bleve/mapping" + "github.com/blevesearch/bleve/search/query" "github.com/ethantkoenig/rupture" ) -type repoIndexerOperation struct { - repoID int64 - deleted bool - watchers []chan<- error -} - -var repoIndexerOperationQueue chan repoIndexerOperation - -// InitRepoIndexer initialize the repo indexer -func InitRepoIndexer() { - if !setting.Indexer.RepoIndexerEnabled { - return - } - waitChannel := make(chan time.Duration) - // FIXME: graceful: This should use a persistable queue - repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) - go func() { - start := time.Now() - log.Info("PID: %d: Initializing Repository Indexer", os.Getpid()) - initRepoIndexer(populateRepoIndexerAsynchronously) - go processRepoIndexerOperationQueue() - waitChannel <- time.Since(start) - }() - if setting.Indexer.StartupTimeout > 0 { - go func() { - timeout := setting.Indexer.StartupTimeout - if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { - timeout += setting.GracefulHammerTime - } - select { - case duration := <-waitChannel: - log.Info("Repository Indexer Initialization took %v", duration) - case <-time.After(timeout): - log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) - } - }() - - } +// indexerID a bleve-compatible unique identifier for an integer id +func indexerID(id int64) string { + return strconv.FormatInt(id, 36) } -// populateRepoIndexerAsynchronously asynchronously populates the repo indexer -// with pre-existing data. This should only be run when the indexer is created -// for the first time. -func populateRepoIndexerAsynchronously() error { - exist, err := models.IsTableNotEmpty("repository") - if err != nil { - return err - } else if !exist { - return nil - } - - var maxRepoID int64 - if maxRepoID, err = models.GetMaxID("repository"); err != nil { - return err - } - go populateRepoIndexer(maxRepoID) - return nil +// numericEqualityQuery a numeric equality query for the given value and field +func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { + f := float64(value) + tru := true + q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) + q.SetField(field) + return q } -// populateRepoIndexer populate the repo indexer with pre-existing data. This -// should only be run when the indexer is created for the first time. -// FIXME: graceful: This should use a persistable queue -func populateRepoIndexer(maxRepoID int64) { - log.Info("Populating the repo indexer with existing repositories") - - isShutdown := graceful.GetManager().IsShutdown() +const unicodeNormalizeName = "unicodeNormalize" - // start with the maximum existing repo ID and work backwards, so that we - // don't include repos that are created after gitea starts; such repos will - // already be added to the indexer, and we don't need to add them again. - for maxRepoID > 0 { - select { - case <-isShutdown: - log.Info("Repository Indexer population shutdown before completion") - return - default: - } - ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50) - if err != nil { - log.Error("populateRepoIndexer: %v", err) - return - } else if len(ids) == 0 { - break - } - for _, id := range ids { - select { - case <-isShutdown: - log.Info("Repository Indexer population shutdown before completion") - return - default: - } - repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: id, - deleted: false, - } - maxRepoID = id - 1 - } - } - log.Info("Done (re)populating the repo indexer with existing repositories") +func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { + return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ + "type": unicodenorm.Name, + "form": unicodenorm.NFC, + }) } -func updateRepoIndexer(repoID int64) error { - repo, err := models.GetRepositoryByID(repoID) - if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err) +const maxBatchSize = 16 + +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, error) { + _, err := os.Stat(setting.Indexer.IssuePath) + if err != nil && os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err } - sha, err := getDefaultBranchSha(repo) + metadata, err := rupture.ReadIndexMetadata(path) if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err) + return nil, err } - changes, err := getRepoChanges(repo, sha) - if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) - } else if changes == nil { - return nil + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, os.RemoveAll(path) } - batch := RepoIndexerBatch() - for _, update := range changes.Updates { - if err := addUpdate(update, repo, batch); err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err) - } - } - for _, filename := range changes.RemovedFilenames { - if err := addDelete(filename, repo, batch); err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err) - } - } - if err = batch.Flush(); err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) + index, err := bleve.Open(path) + if err != nil && err == upsidedown.IncompatibleVersion { + // the indexer was built with a previous version of bleve, so we should + // delete it and re-populate + return nil, os.RemoveAll(path) + } else if err != nil { + return nil, err } - return repo.UpdateIndexerStatus(sha) + return index, nil } // repoChanges changes (file additions/updates/removals) to a repo @@ -288,7 +219,7 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, // previous commit sha may have been removed by a force push, so // try rebuilding from scratch log.Warn("git diff: %v", err) - if err = deleteRepoFromIndexer(repo.ID); err != nil { + if err = indexer.Delete(repo.ID); err != nil { return nil, err } return genesisChanges(repo, revision) @@ -330,51 +261,288 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, return &changes, err } -func processRepoIndexerOperationQueue() { - for { - select { - case op := <-repoIndexerOperationQueue: - var err error - if op.deleted { - if err = deleteRepoFromIndexer(op.repoID); err != nil { - log.Error("DeleteRepoFromIndexer: %v", err) - } - } else { - if err = updateRepoIndexer(op.repoID); err != nil { - log.Error("updateRepoIndexer: %v", err) - } - } - for _, watcher := range op.watchers { - watcher <- err +const ( + repoIndexerAnalyzer = "repoIndexerAnalyzer" + repoIndexerDocType = "repoIndexerDocType" + repoIndexerLatestVersion = 4 +) + +type bleveIndexerHolder struct { + index bleve.Index + mutex sync.RWMutex + cond *sync.Cond +} + +func newBleveIndexerHolder() *bleveIndexerHolder { + b := &bleveIndexerHolder{} + b.cond = sync.NewCond(b.mutex.RLocker()) + return b +} + +func (r *bleveIndexerHolder) set(index bleve.Index) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.index = index + r.cond.Broadcast() +} + +func (r *bleveIndexerHolder) get() bleve.Index { + r.mutex.RLock() + defer r.mutex.RUnlock() + if r.index == nil { + r.cond.Wait() + } + return r.index +} + +// repoIndexer (thread-safe) index for repository contents +var indexerHolder = newBleveIndexerHolder() + +// RepoIndexerOp type of operation to perform on repo indexer +type RepoIndexerOp int + +const ( + // RepoIndexerOpUpdate add/update a file's contents + RepoIndexerOpUpdate = iota + + // RepoIndexerOpDelete delete a file + RepoIndexerOpDelete +) + +// RepoIndexerData data stored in the repo indexer +type RepoIndexerData struct { + RepoID int64 + Content string +} + +// Type returns the document type, for bleve's mapping.Classifier interface. +func (d *RepoIndexerData) Type() string { + return repoIndexerDocType +} + +// RepoIndexerUpdate an update to the repo indexer +type RepoIndexerUpdate struct { + Filepath string + Op RepoIndexerOp + Data *RepoIndexerData +} + +// AddToFlushingBatch adds the update to the given flushing batch. +func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error { + id := filenameIndexerID(update.Data.RepoID, update.Filepath) + switch update.Op { + case RepoIndexerOpUpdate: + return batch.Index(id, update.Data) + case RepoIndexerOpDelete: + return batch.Delete(id) + default: + log.Error("Unrecognized repo indexer op: %d", update.Op) + } + return nil +} + +// createRepoIndexer create a repo indexer if one does not already exist +func createRepoIndexer(path string, latestVersion int) error { + docMapping := bleve.NewDocumentMapping() + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) + + textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("Content", textFieldMapping) + + mapping := bleve.NewIndexMapping() + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { + return err + } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ + "type": custom.Name, + "char_filters": []string{}, + "tokenizer": unicode.Name, + "token_filters": []string{unicodeNormalizeName, lowercase.Name}, + }); err != nil { + return err + } + mapping.DefaultAnalyzer = repoIndexerAnalyzer + mapping.AddDocumentMapping(repoIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) + + indexer, err := bleve.New(path, mapping) + if err != nil { + return err + } + indexerHolder.set(indexer) + + return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ + Version: latestVersion, + }) +} + +func filenameIndexerID(repoID int64, filename string) string { + return indexerID(repoID) + "_" + filename +} + +func filenameOfIndexerID(indexerID string) string { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + return indexerID[index+1:] +} + +var ( + _ Indexer = &BleveIndexer{} +) + +// BleveIndexer represents a bleve indexer implementation +type BleveIndexer struct { + indexDir string + indexer bleve.Index // indexer (thread-safe) index for repository contents +} + +// NewBleveIndexer creates a new bleve local indexer +func NewBleveIndexer(indexDir string) *BleveIndexer { + return &BleveIndexer{ + indexDir: indexDir, + } +} + +// Init init the indexer +func (b *BleveIndexer) Init() (bool, error) { + indexer, err := openIndexer(b.indexDir, repoIndexerLatestVersion) + if err != nil { + log.Fatal("openIndexer: %v", err) + } + if indexer != nil { + indexerHolder.set(indexer) + closeAtTerminate() + return false, nil + } + + err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) + closeAtTerminate() + return err == nil, err +} + +func closeAtTerminate() { + graceful.GetManager().RunAtTerminate(context.Background(), func() { + log.Debug("Closing repo indexer") + indexer := indexerHolder.get() + if indexer != nil { + err := indexer.Close() + if err != nil { + log.Error("Error whilst closing the repository indexer: %v", err) } - case <-graceful.GetManager().IsShutdown(): - log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) - return } + log.Info("PID: %d Repository Indexer closed", os.Getpid()) + }) +} +// Index indexes the data +func (b *BleveIndexer) Index(repoID int64) error { + repo, err := models.GetRepositoryByID(repoID) + if err != nil { + return err + } + + sha, err := getDefaultBranchSha(repo) + if err != nil { + return err + } + changes, err := getRepoChanges(repo, sha) + if err != nil { + return err + } else if changes == nil { + return nil } -} -// DeleteRepoFromIndexer remove all of a repository's entries from the indexer -func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) + batch := rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) + for _, update := range changes.Updates { + if err := addUpdate(update, repo, batch); err != nil { + return err + } + } + for _, filename := range changes.RemovedFilenames { + if err := addDelete(filename, repo, batch); err != nil { + return err + } + } + if err = batch.Flush(); err != nil { + return err + } + return repo.UpdateIndexerStatus(sha) } -// UpdateRepoIndexer update a repository's entries in the indexer -func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) +// Delete deletes indexes by ids +func (b *BleveIndexer) Delete(repoID int64) error { + query := numericEqualityQuery(repoID, "RepoID") + searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) + result, err := indexerHolder.get().Search(searchRequest) + if err != nil { + return err + } + batch := rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) + for _, hit := range result.Hits { + if err = batch.Delete(hit.ID); err != nil { + return err + } + } + return batch.Flush() } -func addOperationToQueue(op repoIndexerOperation) { - if !setting.Indexer.RepoIndexerEnabled { - return +// Search searches for files in the specified repo. +// Returns the matching file-paths +func (b *BleveIndexer) Search(repoIDs []int64, keyword string, page, pageSize int) (int64, []*SearchResult, error) { + phraseQuery := bleve.NewMatchPhraseQuery(keyword) + phraseQuery.FieldVal = "Content" + phraseQuery.Analyzer = repoIndexerAnalyzer + + var indexerQuery query.Query + if len(repoIDs) > 0 { + var repoQueries = make([]query.Query, 0, len(repoIDs)) + for _, repoID := range repoIDs { + repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID")) + } + + indexerQuery = bleve.NewConjunctionQuery( + bleve.NewDisjunctionQuery(repoQueries...), + phraseQuery, + ) + } else { + indexerQuery = phraseQuery } - select { - case repoIndexerOperationQueue <- op: - break - default: - go func() { - repoIndexerOperationQueue <- op - }() + + from := (page - 1) * pageSize + searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false) + searchRequest.Fields = []string{"Content", "RepoID"} + searchRequest.IncludeLocations = true + + result, err := indexerHolder.get().Search(searchRequest) + if err != nil { + return 0, nil, err + } + + searchResults := make([]*SearchResult, len(result.Hits)) + for i, hit := range result.Hits { + var startIndex, endIndex int = -1, -1 + for _, locations := range hit.Locations["Content"] { + location := locations[0] + locationStart := int(location.Start) + locationEnd := int(location.End) + if startIndex < 0 || locationStart < startIndex { + startIndex = locationStart + } + if endIndex < 0 || locationEnd > endIndex { + endIndex = locationEnd + } + } + searchResults[i] = &SearchResult{ + RepoID: int64(hit.Fields["RepoID"].(float64)), + StartIndex: startIndex, + EndIndex: endIndex, + Filename: filenameOfIndexerID(hit.ID), + Content: hit.Fields["Content"].(string), + } } + return int64(result.Total), searchResults, nil } diff --git a/modules/indexer/code/bleve_test.go b/modules/indexer/code/bleve_test.go index 2eafeef3c530c..cf00e4f376113 100644 --- a/modules/indexer/code/bleve_test.go +++ b/modules/indexer/code/bleve_test.go @@ -5,12 +5,70 @@ package code import ( + "os" "path/filepath" "testing" "code.gitea.io/gitea/models" + "github.com/stretchr/testify/assert" ) func TestMain(m *testing.M) { models.MainTest(m, filepath.Join("..", "..", "..")) } + +func TestIndexAndSearch(t *testing.T) { + dir := "./bleve.index" + indexer := NewBleveIndexer(dir) + defer os.RemoveAll(dir) + + _, err := indexer.Init() + assert.NoError(t, err) + + err = indexer.Index(2) + assert.NoError(t, err) + + var ( + keywords = []struct { + Keyword string + IDs []int64 + }{ + { + Keyword: "search", + IDs: []int64{1}, + }, + { + Keyword: "test1", + IDs: []int64{1}, + }, + { + Keyword: "test2", + IDs: []int64{1}, + }, + { + Keyword: "support", + IDs: []int64{1, 2}, + }, + { + Keyword: "chinese", + IDs: []int64{1, 2}, + }, + { + Keyword: "help", + IDs: []int64{}, + }, + } + ) + + for _, kw := range keywords { + total, res, err := indexer.Search(kw.IDs, kw.Keyword, 1, 10) + assert.NoError(t, err) + assert.EqualValues(t, len(kw.IDs), total) + + var ids = make([]int64, 0, len(res)) + for _, hit := range res { + ids = append(ids, hit.RepoID) + } + assert.EqualValues(t, kw.IDs, ids) + } +} diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 3907a7b57d60f..d25a9e79f14ea 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -5,72 +5,70 @@ package code import ( - "os" - "strconv" + "time" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" - - "github.com/blevesearch/bleve" - "github.com/blevesearch/bleve/analysis/token/unicodenorm" - "github.com/blevesearch/bleve/index/upsidedown" - "github.com/blevesearch/bleve/mapping" - "github.com/blevesearch/bleve/search/query" - "github.com/ethantkoenig/rupture" ) -// indexerID a bleve-compatible unique identifier for an integer id -func indexerID(id int64) string { - return strconv.FormatInt(id, 36) -} +var ( + indexer Indexer +) -// numericEqualityQuery a numeric equality query for the given value and field -func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { - f := float64(value) - tru := true - q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) - q.SetField(field) - return q +// SearchResult result of performing a search in a repo +type SearchResult struct { + RepoID int64 + StartIndex int + EndIndex int + Filename string + Content string } -const unicodeNormalizeName = "unicodeNormalize" - -func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { - return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ - "type": unicodenorm.Name, - "form": unicodenorm.NFC, - }) +// Indexer defines an inteface to indexer issues contents +type Indexer interface { + Init() (bool, error) + Index(repoID int64) error + Delete(repoID int64) error + Search(repoIDs []int64, keyword string, page, pageSize int) (int64, []*SearchResult, error) } -const maxBatchSize = 16 - -// openIndexer open the index at the specified path, checking for metadata -// updates and bleve version updates. If index needs to be created (or -// re-created), returns (nil, nil) -func openIndexer(path string, latestVersion int) (bleve.Index, error) { - _, err := os.Stat(setting.Indexer.IssuePath) - if err != nil && os.IsNotExist(err) { - return nil, nil - } else if err != nil { - return nil, err - } - - metadata, err := rupture.ReadIndexMetadata(path) - if err != nil { - return nil, err - } - if metadata.Version < latestVersion { - // the indexer is using a previous version, so we should delete it and - // re-populate - return nil, os.RemoveAll(path) +// Init initialize the repo indexer +func Init() { + if !setting.Indexer.RepoIndexerEnabled { + return } + waitChannel := make(chan time.Duration) + repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) + go func() { + start := time.Now() + log.Info("Initializing Repository Indexer") + indexer = NewBleveIndexer(setting.Indexer.RepoPath) + created, err := indexer.Init() + if err != nil { + log.Fatal("indexer.Init: %v", err) + } + if created { + if err := populateRepoIndexerAsynchronously(); err != nil { + log.Fatal("PopulateRepoIndex: %v", err) + } + } + go processRepoIndexerOperationQueue() + waitChannel <- time.Since(start) + }() + if setting.Indexer.StartupTimeout > 0 { + go func() { + timeout := setting.Indexer.StartupTimeout + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { + timeout += setting.GracefulHammerTime + } + select { + case duration := <-waitChannel: + log.Info("Repository Indexer Initialization took %v", duration) + case <-time.After(timeout): + log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) + } + }() - index, err := bleve.Open(path) - if err != nil && err == upsidedown.IncompatibleVersion { - // the indexer was built with a previous version of bleve, so we should - // delete it and re-populate - return nil, os.RemoveAll(path) - } else if err != nil { - return nil, err } - return index, nil } diff --git a/modules/indexer/code/queue.go b/modules/indexer/code/queue.go new file mode 100644 index 0000000000000..9a4de46ff54c8 --- /dev/null +++ b/modules/indexer/code/queue.go @@ -0,0 +1,137 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "os" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +type repoIndexerOperation struct { + repoID int64 + deleted bool + watchers []chan<- error +} + +var repoIndexerOperationQueue chan repoIndexerOperation + +func processRepoIndexerOperationQueue() { + for { + select { + case op := <-repoIndexerOperationQueue: + var err error + if op.deleted { + if err = indexer.Delete(op.repoID); err != nil { + log.Error("indexer.Delete: %v", err) + } + } else { + if err = indexer.Index(op.repoID); err != nil { + log.Error("indexer.Index: %v", err) + } + } + for _, watcher := range op.watchers { + watcher <- err + } + case <-graceful.GetManager().IsShutdown(): + log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) + return + } + } +} + +// DeleteRepoFromIndexer remove all of a repository's entries from the indexer +func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { + addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) +} + +// UpdateRepoIndexer update a repository's entries in the indexer +func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { + addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) +} + +func addOperationToQueue(op repoIndexerOperation) { + if !setting.Indexer.RepoIndexerEnabled { + return + } + select { + case repoIndexerOperationQueue <- op: + break + default: + go func() { + repoIndexerOperationQueue <- op + }() + } +} + +// populateRepoIndexerAsynchronously asynchronously populates the repo indexer +// with pre-existing data. This should only be run when the indexer is created +// for the first time. +func populateRepoIndexerAsynchronously() error { + exist, err := models.IsTableNotEmpty("repository") + if err != nil { + return err + } else if !exist { + return nil + } + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { + return err + } + + var maxRepoID int64 + if maxRepoID, err = models.GetMaxID("repository"); err != nil { + return err + } + go populateRepoIndexer(maxRepoID) + return nil +} + +// populateRepoIndexer populate the repo indexer with pre-existing data. This +// should only be run when the indexer is created for the first time. +func populateRepoIndexer(maxRepoID int64) { + log.Info("Populating the repo indexer with existing repositories") + + isShutdown := graceful.GetManager().IsShutdown() + + // start with the maximum existing repo ID and work backwards, so that we + // don't include repos that are created after gitea starts; such repos will + // already be added to the indexer, and we don't need to add them again. + for maxRepoID > 0 { + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50) + if err != nil { + log.Error("populateRepoIndexer: %v", err) + return + } else if len(ids) == 0 { + break + } + for _, id := range ids { + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + repoIndexerOperationQueue <- repoIndexerOperation{ + repoID: id, + deleted: false, + } + maxRepoID = id - 1 + } + } + log.Info("Done (re)populating the repo indexer with existing repositories") +} diff --git a/modules/indexer/code/repo.go b/modules/indexer/code/repo.go index bc5f317b7d4e4..84642f5e8b91b 100644 --- a/modules/indexer/code/repo.go +++ b/modules/indexer/code/repo.go @@ -5,224 +5,10 @@ package code import ( - "context" - "os" - "strings" - "sync" - - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/setting" - "github.com/blevesearch/bleve" - "github.com/blevesearch/bleve/analysis/analyzer/custom" - "github.com/blevesearch/bleve/analysis/token/lowercase" - "github.com/blevesearch/bleve/analysis/tokenizer/unicode" "github.com/blevesearch/bleve/search/query" - "github.com/ethantkoenig/rupture" -) - -const ( - repoIndexerAnalyzer = "repoIndexerAnalyzer" - repoIndexerDocType = "repoIndexerDocType" - - repoIndexerLatestVersion = 4 -) - -type bleveIndexerHolder struct { - index bleve.Index - mutex sync.RWMutex - cond *sync.Cond -} - -func newBleveIndexerHolder() *bleveIndexerHolder { - b := &bleveIndexerHolder{} - b.cond = sync.NewCond(b.mutex.RLocker()) - return b -} - -func (r *bleveIndexerHolder) set(index bleve.Index) { - r.mutex.Lock() - defer r.mutex.Unlock() - r.index = index - r.cond.Broadcast() -} - -func (r *bleveIndexerHolder) get() bleve.Index { - r.mutex.RLock() - defer r.mutex.RUnlock() - if r.index == nil { - r.cond.Wait() - } - return r.index -} - -// repoIndexer (thread-safe) index for repository contents -var indexerHolder = newBleveIndexerHolder() - -// RepoIndexerOp type of operation to perform on repo indexer -type RepoIndexerOp int - -const ( - // RepoIndexerOpUpdate add/update a file's contents - RepoIndexerOpUpdate = iota - - // RepoIndexerOpDelete delete a file - RepoIndexerOpDelete ) -// RepoIndexerData data stored in the repo indexer -type RepoIndexerData struct { - RepoID int64 - Content string -} - -// Type returns the document type, for bleve's mapping.Classifier interface. -func (d *RepoIndexerData) Type() string { - return repoIndexerDocType -} - -// RepoIndexerUpdate an update to the repo indexer -type RepoIndexerUpdate struct { - Filepath string - Op RepoIndexerOp - Data *RepoIndexerData -} - -// AddToFlushingBatch adds the update to the given flushing batch. -func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error { - id := filenameIndexerID(update.Data.RepoID, update.Filepath) - switch update.Op { - case RepoIndexerOpUpdate: - return batch.Index(id, update.Data) - case RepoIndexerOpDelete: - return batch.Delete(id) - default: - log.Error("Unrecognized repo indexer op: %d", update.Op) - } - return nil -} - -// initRepoIndexer initialize repo indexer -func initRepoIndexer(populateIndexer func() error) { - indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) - if err != nil { - log.Fatal("InitRepoIndexer %s: %v", setting.Indexer.RepoPath, err) - } - if indexer != nil { - indexerHolder.set(indexer) - closeAtTerminate() - - // Continue population from where left off - if err = populateIndexer(); err != nil { - log.Fatal("PopulateRepoIndex: %v", err) - } - return - } - - if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil { - log.Fatal("CreateRepoIndexer: %v", err) - } - closeAtTerminate() - - // if there is any existing repo indexer metadata in the DB, delete it - // since we are starting afresh. Also, xorm requires deletes to have a - // condition, and we want to delete everything, thus 1=1. - if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { - log.Fatal("DeleteAllRepoIndexerStatus: %v", err) - } - - if err = populateIndexer(); err != nil { - log.Fatal("PopulateRepoIndex: %v", err) - } -} - -func closeAtTerminate() { - graceful.GetManager().RunAtTerminate(context.Background(), func() { - log.Debug("Closing repo indexer") - indexer := indexerHolder.get() - if indexer != nil { - err := indexer.Close() - if err != nil { - log.Error("Error whilst closing the repository indexer: %v", err) - } - } - log.Info("PID: %d Repository Indexer closed", os.Getpid()) - }) -} - -// createRepoIndexer create a repo indexer if one does not already exist -func createRepoIndexer(path string, latestVersion int) error { - docMapping := bleve.NewDocumentMapping() - numericFieldMapping := bleve.NewNumericFieldMapping() - numericFieldMapping.IncludeInAll = false - docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) - - textFieldMapping := bleve.NewTextFieldMapping() - textFieldMapping.IncludeInAll = false - docMapping.AddFieldMappingsAt("Content", textFieldMapping) - - mapping := bleve.NewIndexMapping() - if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { - return err - } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ - "type": custom.Name, - "char_filters": []string{}, - "tokenizer": unicode.Name, - "token_filters": []string{unicodeNormalizeName, lowercase.Name}, - }); err != nil { - return err - } - mapping.DefaultAnalyzer = repoIndexerAnalyzer - mapping.AddDocumentMapping(repoIndexerDocType, docMapping) - mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) - - indexer, err := bleve.New(path, mapping) - if err != nil { - return err - } - indexerHolder.set(indexer) - - return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ - Version: latestVersion, - }) -} - -func filenameIndexerID(repoID int64, filename string) string { - return indexerID(repoID) + "_" + filename -} - -func filenameOfIndexerID(indexerID string) string { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - return indexerID[index+1:] -} - -// RepoIndexerBatch batch to add updates to -func RepoIndexerBatch() rupture.FlushingBatch { - return rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) -} - -// deleteRepoFromIndexer delete all of a repo's files from indexer -func deleteRepoFromIndexer(repoID int64) error { - query := numericEqualityQuery(repoID, "RepoID") - searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := indexerHolder.get().Search(searchRequest) - if err != nil { - return err - } - batch := RepoIndexerBatch() - for _, hit := range result.Hits { - if err = batch.Delete(hit.ID); err != nil { - return err - } - } - return batch.Flush() -} - // RepoSearchResult result of performing a search in a repo type RepoSearchResult struct { RepoID int64 diff --git a/modules/search/search.go b/modules/indexer/code/search.go similarity index 92% rename from modules/search/search.go rename to modules/indexer/code/search.go index 531d95b187e01..18f193a532d9b 100644 --- a/modules/search/search.go +++ b/modules/indexer/code/search.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package search +package code import ( "bytes" @@ -11,7 +11,6 @@ import ( "strings" "code.gitea.io/gitea/modules/highlight" - code_indexer "code.gitea.io/gitea/modules/indexer/code" "code.gitea.io/gitea/modules/util" ) @@ -60,7 +59,7 @@ func writeStrings(buf *bytes.Buffer, strs ...string) error { return nil } -func searchResult(result *code_indexer.RepoSearchResult, startIndex, endIndex int) (*Result, error) { +func searchResult(result *SearchResult, startIndex, endIndex int) (*Result, error) { startLineNum := 1 + strings.Count(result.Content[:startIndex], "\n") var formattedLinesBuffer bytes.Buffer @@ -113,7 +112,7 @@ func PerformSearch(repoIDs []int64, keyword string, page, pageSize int) (int, [] return 0, nil, nil } - total, results, err := code_indexer.SearchRepoByKeyword(repoIDs, keyword, page, pageSize) + total, results, err := indexer.Search(repoIDs, keyword, page, pageSize) if err != nil { return 0, nil, err } diff --git a/routers/home.go b/routers/home.go index 50e1a2b2a4270..4d4bfa56200f7 100644 --- a/routers/home.go +++ b/routers/home.go @@ -12,8 +12,8 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" + code_indexer "code.gitea.io/gitea/modules/indexer/code" "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/search" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/routers/user" @@ -312,7 +312,7 @@ func ExploreCode(ctx *context.Context) { var ( total int - searchResults []*search.Result + searchResults []*code_indexer.Result ) // if non-admin login user, we need check UnitTypeCode at first @@ -334,14 +334,14 @@ func ExploreCode(ctx *context.Context) { ctx.Data["RepoMaps"] = rightRepoMap - total, searchResults, err = search.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum) + total, searchResults, err = code_indexer.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum) if err != nil { ctx.ServerError("SearchResults", err) return } // if non-login user or isAdmin, no need to check UnitTypeCode } else if (ctx.User == nil && len(repoIDs) > 0) || isAdmin { - total, searchResults, err = search.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum) + total, searchResults, err = code_indexer.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum) if err != nil { ctx.ServerError("SearchResults", err) return diff --git a/routers/init.go b/routers/init.go index 01df15d6c5934..41fc203c5635e 100644 --- a/routers/init.go +++ b/routers/init.go @@ -104,7 +104,7 @@ func GlobalInit(ctx context.Context) { // Booting long running goroutines. cron.NewContext() issue_indexer.InitIssueIndexer(false) - code_indexer.InitRepoIndexer() + code_indexer.Init() mirror_service.InitSyncMirrors() webhook.InitDeliverHooks() pull_service.Init() diff --git a/routers/repo/search.go b/routers/repo/search.go index de16eda83d183..50a92c1e36fab 100644 --- a/routers/repo/search.go +++ b/routers/repo/search.go @@ -10,7 +10,7 @@ import ( "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" - "code.gitea.io/gitea/modules/search" + code_indexer "code.gitea.io/gitea/modules/indexer/code" "code.gitea.io/gitea/modules/setting" ) @@ -27,7 +27,7 @@ func Search(ctx *context.Context) { if page <= 0 { page = 1 } - total, searchResults, err := search.PerformSearch([]int64{ctx.Repo.Repository.ID}, + total, searchResults, err := code_indexer.PerformSearch([]int64{ctx.Repo.Repository.ID}, keyword, page, setting.UI.RepoSearchPagingNum) if err != nil { ctx.ServerError("SearchResults", err) From c0147128baeed9dbe9b96fac3caf0595474201d4 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Wed, 11 Dec 2019 00:45:40 +0800 Subject: [PATCH 2/9] fix test --- modules/indexer/code/bleve.go | 52 +++++++++++++----------- modules/indexer/code/repo.go | 76 ----------------------------------- 2 files changed, 28 insertions(+), 100 deletions(-) delete mode 100644 modules/indexer/code/repo.go diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index 2ae637b540063..ae193dbba909f 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -295,9 +295,6 @@ func (r *bleveIndexerHolder) get() bleve.Index { return r.index } -// repoIndexer (thread-safe) index for repository contents -var indexerHolder = newBleveIndexerHolder() - // RepoIndexerOp type of operation to perform on repo indexer type RepoIndexerOp int @@ -342,7 +339,7 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) } // createRepoIndexer create a repo indexer if one does not already exist -func createRepoIndexer(path string, latestVersion int) error { +func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) { docMapping := bleve.NewDocumentMapping() numericFieldMapping := bleve.NewNumericFieldMapping() numericFieldMapping.IncludeInAll = false @@ -354,14 +351,14 @@ func createRepoIndexer(path string, latestVersion int) error { mapping := bleve.NewIndexMapping() if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { - return err + return nil, err } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ "type": custom.Name, "char_filters": []string{}, "tokenizer": unicode.Name, "token_filters": []string{unicodeNormalizeName, lowercase.Name}, }); err != nil { - return err + return nil, err } mapping.DefaultAnalyzer = repoIndexerAnalyzer mapping.AddDocumentMapping(repoIndexerDocType, docMapping) @@ -369,13 +366,15 @@ func createRepoIndexer(path string, latestVersion int) error { indexer, err := bleve.New(path, mapping) if err != nil { - return err + return nil, err } - indexerHolder.set(indexer) - return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ + if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ Version: latestVersion, - }) + }); err != nil { + return nil, err + } + return indexer, nil } func filenameIndexerID(repoID int64, filename string) string { @@ -396,14 +395,15 @@ var ( // BleveIndexer represents a bleve indexer implementation type BleveIndexer struct { - indexDir string - indexer bleve.Index // indexer (thread-safe) index for repository contents + indexDir string + indexerHolder *bleveIndexerHolder } // NewBleveIndexer creates a new bleve local indexer func NewBleveIndexer(indexDir string) *BleveIndexer { return &BleveIndexer{ - indexDir: indexDir, + indexDir: indexDir, + indexerHolder: newBleveIndexerHolder(), } } @@ -414,20 +414,24 @@ func (b *BleveIndexer) Init() (bool, error) { log.Fatal("openIndexer: %v", err) } if indexer != nil { - indexerHolder.set(indexer) - closeAtTerminate() + b.indexerHolder.set(indexer) + b.closeAtTerminate() return false, nil } - err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) - closeAtTerminate() - return err == nil, err + indexer, err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) + if err != nil { + return false, err + } + b.indexerHolder.set(indexer) + b.closeAtTerminate() + return true, nil } -func closeAtTerminate() { +func (b *BleveIndexer) closeAtTerminate() { graceful.GetManager().RunAtTerminate(context.Background(), func() { log.Debug("Closing repo indexer") - indexer := indexerHolder.get() + indexer := b.indexerHolder.get() if indexer != nil { err := indexer.Close() if err != nil { @@ -456,7 +460,7 @@ func (b *BleveIndexer) Index(repoID int64) error { return nil } - batch := rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) + batch := rupture.NewFlushingBatch(b.indexerHolder.get(), maxBatchSize) for _, update := range changes.Updates { if err := addUpdate(update, repo, batch); err != nil { return err @@ -477,11 +481,11 @@ func (b *BleveIndexer) Index(repoID int64) error { func (b *BleveIndexer) Delete(repoID int64) error { query := numericEqualityQuery(repoID, "RepoID") searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := indexerHolder.get().Search(searchRequest) + result, err := b.indexerHolder.get().Search(searchRequest) if err != nil { return err } - batch := rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) + batch := rupture.NewFlushingBatch(b.indexerHolder.get(), maxBatchSize) for _, hit := range result.Hits { if err = batch.Delete(hit.ID); err != nil { return err @@ -517,7 +521,7 @@ func (b *BleveIndexer) Search(repoIDs []int64, keyword string, page, pageSize in searchRequest.Fields = []string{"Content", "RepoID"} searchRequest.IncludeLocations = true - result, err := indexerHolder.get().Search(searchRequest) + result, err := b.indexerHolder.get().Search(searchRequest) if err != nil { return 0, nil, err } diff --git a/modules/indexer/code/repo.go b/modules/indexer/code/repo.go deleted file mode 100644 index 84642f5e8b91b..0000000000000 --- a/modules/indexer/code/repo.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2017 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package code - -import ( - "github.com/blevesearch/bleve" - "github.com/blevesearch/bleve/search/query" -) - -// RepoSearchResult result of performing a search in a repo -type RepoSearchResult struct { - RepoID int64 - StartIndex int - EndIndex int - Filename string - Content string -} - -// SearchRepoByKeyword searches for files in the specified repo. -// Returns the matching file-paths -func SearchRepoByKeyword(repoIDs []int64, keyword string, page, pageSize int) (int64, []*RepoSearchResult, error) { - phraseQuery := bleve.NewMatchPhraseQuery(keyword) - phraseQuery.FieldVal = "Content" - phraseQuery.Analyzer = repoIndexerAnalyzer - - var indexerQuery query.Query - if len(repoIDs) > 0 { - var repoQueries = make([]query.Query, 0, len(repoIDs)) - for _, repoID := range repoIDs { - repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID")) - } - - indexerQuery = bleve.NewConjunctionQuery( - bleve.NewDisjunctionQuery(repoQueries...), - phraseQuery, - ) - } else { - indexerQuery = phraseQuery - } - - from := (page - 1) * pageSize - searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false) - searchRequest.Fields = []string{"Content", "RepoID"} - searchRequest.IncludeLocations = true - - result, err := indexerHolder.get().Search(searchRequest) - if err != nil { - return 0, nil, err - } - - searchResults := make([]*RepoSearchResult, len(result.Hits)) - for i, hit := range result.Hits { - var startIndex, endIndex int = -1, -1 - for _, locations := range hit.Locations["Content"] { - location := locations[0] - locationStart := int(location.Start) - locationEnd := int(location.End) - if startIndex < 0 || locationStart < startIndex { - startIndex = locationStart - } - if endIndex < 0 || locationEnd > endIndex { - endIndex = locationEnd - } - } - searchResults[i] = &RepoSearchResult{ - RepoID: int64(hit.Fields["RepoID"].(float64)), - StartIndex: startIndex, - EndIndex: endIndex, - Filename: filenameOfIndexerID(hit.ID), - Content: hit.Fields["Content"].(string), - } - } - return int64(result.Total), searchResults, nil -} From 1251821cde5ccf7fb142ecacff5ed938b86d4986 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Wed, 11 Dec 2019 10:49:39 +0800 Subject: [PATCH 3/9] fix test --- modules/indexer/code/bleve_test.go | 26 ++++++++------------------ modules/setting/indexer.go | 2 ++ 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/modules/indexer/code/bleve_test.go b/modules/indexer/code/bleve_test.go index cf00e4f376113..af80b2d5c3558 100644 --- a/modules/indexer/code/bleve_test.go +++ b/modules/indexer/code/bleve_test.go @@ -18,14 +18,16 @@ func TestMain(m *testing.M) { } func TestIndexAndSearch(t *testing.T) { + models.PrepareTestEnv(t) + dir := "./bleve.index" - indexer := NewBleveIndexer(dir) defer os.RemoveAll(dir) + indexer := NewBleveIndexer(dir) _, err := indexer.Init() assert.NoError(t, err) - err = indexer.Index(2) + err = indexer.Index(1) assert.NoError(t, err) var ( @@ -34,34 +36,22 @@ func TestIndexAndSearch(t *testing.T) { IDs []int64 }{ { - Keyword: "search", - IDs: []int64{1}, - }, - { - Keyword: "test1", + Keyword: "Description", IDs: []int64{1}, }, { - Keyword: "test2", + Keyword: "repo1", IDs: []int64{1}, }, { - Keyword: "support", - IDs: []int64{1, 2}, - }, - { - Keyword: "chinese", - IDs: []int64{1, 2}, - }, - { - Keyword: "help", + Keyword: "non-exist", IDs: []int64{}, }, } ) for _, kw := range keywords { - total, res, err := indexer.Search(kw.IDs, kw.Keyword, 1, 10) + total, res, err := indexer.Search(nil, kw.Keyword, 1, 10) assert.NoError(t, err) assert.EqualValues(t, len(kw.IDs), total) diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go index fbaef3fcf2218..589e7bf86475c 100644 --- a/modules/setting/indexer.go +++ b/modules/setting/indexer.go @@ -45,6 +45,8 @@ var ( IssueQueueDir: "indexers/issues.queue", IssueQueueConnStr: "", IssueQueueBatchNumber: 20, + + MaxIndexerFileSize: 1024 * 1024, } ) From f0716780d7de0f78fbb352bc862706260bcd2622 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Wed, 11 Dec 2019 11:05:39 +0800 Subject: [PATCH 4/9] refactor code indexer --- modules/indexer/code/bleve.go | 210 +++------------------------------- modules/indexer/code/git.go | 147 ++++++++++++++++++++++++ 2 files changed, 165 insertions(+), 192 deletions(-) create mode 100644 modules/indexer/code/git.go diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index ae193dbba909f..fc686f71e11f6 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -30,6 +30,9 @@ import ( "github.com/ethantkoenig/rupture" ) +const unicodeNormalizeName = "unicodeNormalize" +const maxBatchSize = 16 + // indexerID a bleve-compatible unique identifier for an integer id func indexerID(id int64) string { return strconv.FormatInt(id, 36) @@ -44,8 +47,6 @@ func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { return q } -const unicodeNormalizeName = "unicodeNormalize" - func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ "type": unicodenorm.Name, @@ -53,8 +54,6 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { }) } -const maxBatchSize = 16 - // openIndexer open the index at the specified path, checking for metadata // updates and bleve version updates. If index needs to be created (or // re-created), returns (nil, nil) @@ -87,35 +86,15 @@ func openIndexer(path string, latestVersion int) (bleve.Index, error) { return index, nil } -// repoChanges changes (file additions/updates/removals) to a repo -type repoChanges struct { - Updates []fileUpdate - RemovedFilenames []string -} - -type fileUpdate struct { - Filename string - BlobSha string -} - -func getDefaultBranchSha(repo *models.Repository) (string, error) { - stdout, err := git.NewCommand("show-ref", "-s", git.BranchPrefix+repo.DefaultBranch).RunInDir(repo.RepoPath()) - if err != nil { - return "", err - } - return strings.TrimSpace(stdout), nil +// RepoIndexerData data stored in the repo indexer +type RepoIndexerData struct { + RepoID int64 + Content string } -// getRepoChanges returns changes to repo since last indexer update -func getRepoChanges(repo *models.Repository, revision string) (*repoChanges, error) { - if err := repo.GetIndexerStatus(); err != nil { - return nil, err - } - - if len(repo.IndexerStatus.CommitSha) == 0 { - return genesisChanges(repo, revision) - } - return nonGenesisChanges(repo, revision) +// Type returns the document type, for bleve's mapping.Classifier interface. +func (d *RepoIndexerData) Type() string { + return repoIndexerDocType } func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { @@ -138,127 +117,17 @@ func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.Flushin // FIXME: UTF-16 files will probably fail here return nil } - indexerUpdate := RepoIndexerUpdate{ - Filepath: update.Filename, - Op: RepoIndexerOpUpdate, - Data: &RepoIndexerData{ - RepoID: repo.ID, - Content: string(charset.ToUTF8DropErrors(fileContents)), - }, - } - return indexerUpdate.AddToFlushingBatch(batch) -} - -func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { - indexerUpdate := RepoIndexerUpdate{ - Filepath: filename, - Op: RepoIndexerOpDelete, - Data: &RepoIndexerData{ - RepoID: repo.ID, - }, - } - return indexerUpdate.AddToFlushingBatch(batch) -} - -func isIndexable(entry *git.TreeEntry) bool { - if !entry.IsRegular() && !entry.IsExecutable() { - return false - } - name := strings.ToLower(entry.Name()) - for _, g := range setting.Indexer.ExcludePatterns { - if g.Match(name) { - return false - } - } - for _, g := range setting.Indexer.IncludePatterns { - if g.Match(name) { - return true - } - } - return len(setting.Indexer.IncludePatterns) == 0 -} - -// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command -func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { - entries, err := git.ParseTreeEntries(stdout) - if err != nil { - return nil, err - } - var idxCount = 0 - updates := make([]fileUpdate, len(entries)) - for _, entry := range entries { - if isIndexable(entry) { - updates[idxCount] = fileUpdate{ - Filename: entry.Name(), - BlobSha: entry.ID.String(), - } - idxCount++ - } - } - return updates[:idxCount], nil -} -// genesisChanges get changes to add repo to the indexer for the first time -func genesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { - var changes repoChanges - stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision). - RunInDirBytes(repo.RepoPath()) - if err != nil { - return nil, err - } - changes.Updates, err = parseGitLsTreeOutput(stdout) - return &changes, err + id := filenameIndexerID(repo.ID, update.Filename) + return batch.Index(id, &RepoIndexerData{ + RepoID: repo.ID, + Content: string(charset.ToUTF8DropErrors(fileContents)), + }) } -// nonGenesisChanges get changes since the previous indexer update -func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { - diffCmd := git.NewCommand("diff", "--name-status", - repo.IndexerStatus.CommitSha, revision) - stdout, err := diffCmd.RunInDir(repo.RepoPath()) - if err != nil { - // previous commit sha may have been removed by a force push, so - // try rebuilding from scratch - log.Warn("git diff: %v", err) - if err = indexer.Delete(repo.ID); err != nil { - return nil, err - } - return genesisChanges(repo, revision) - } - var changes repoChanges - updatedFilenames := make([]string, 0, 10) - for _, line := range strings.Split(stdout, "\n") { - line = strings.TrimSpace(line) - if len(line) == 0 { - continue - } - filename := strings.TrimSpace(line[1:]) - if len(filename) == 0 { - continue - } else if filename[0] == '"' { - filename, err = strconv.Unquote(filename) - if err != nil { - return nil, err - } - } - - switch status := line[0]; status { - case 'M', 'A': - updatedFilenames = append(updatedFilenames, filename) - case 'D': - changes.RemovedFilenames = append(changes.RemovedFilenames, filename) - default: - log.Warn("Unrecognized status: %c (line=%s)", status, line) - } - } - - cmd := git.NewCommand("ls-tree", "--full-tree", revision, "--") - cmd.AddArguments(updatedFilenames...) - lsTreeStdout, err := cmd.RunInDirBytes(repo.RepoPath()) - if err != nil { - return nil, err - } - changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout) - return &changes, err +func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { + id := filenameIndexerID(repo.ID, filename) + return batch.Delete(id) } const ( @@ -295,49 +164,6 @@ func (r *bleveIndexerHolder) get() bleve.Index { return r.index } -// RepoIndexerOp type of operation to perform on repo indexer -type RepoIndexerOp int - -const ( - // RepoIndexerOpUpdate add/update a file's contents - RepoIndexerOpUpdate = iota - - // RepoIndexerOpDelete delete a file - RepoIndexerOpDelete -) - -// RepoIndexerData data stored in the repo indexer -type RepoIndexerData struct { - RepoID int64 - Content string -} - -// Type returns the document type, for bleve's mapping.Classifier interface. -func (d *RepoIndexerData) Type() string { - return repoIndexerDocType -} - -// RepoIndexerUpdate an update to the repo indexer -type RepoIndexerUpdate struct { - Filepath string - Op RepoIndexerOp - Data *RepoIndexerData -} - -// AddToFlushingBatch adds the update to the given flushing batch. -func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error { - id := filenameIndexerID(update.Data.RepoID, update.Filepath) - switch update.Op { - case RepoIndexerOpUpdate: - return batch.Index(id, update.Data) - case RepoIndexerOpDelete: - return batch.Delete(id) - default: - log.Error("Unrecognized repo indexer op: %d", update.Op) - } - return nil -} - // createRepoIndexer create a repo indexer if one does not already exist func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) { docMapping := bleve.NewDocumentMapping() diff --git a/modules/indexer/code/git.go b/modules/indexer/code/git.go new file mode 100644 index 0000000000000..bfa7d20438ef8 --- /dev/null +++ b/modules/indexer/code/git.go @@ -0,0 +1,147 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "strconv" + "strings" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +type fileUpdate struct { + Filename string + BlobSha string +} + +// repoChanges changes (file additions/updates/removals) to a repo +type repoChanges struct { + Updates []fileUpdate + RemovedFilenames []string +} + +func getDefaultBranchSha(repo *models.Repository) (string, error) { + stdout, err := git.NewCommand("show-ref", "-s", git.BranchPrefix+repo.DefaultBranch).RunInDir(repo.RepoPath()) + if err != nil { + return "", err + } + return strings.TrimSpace(stdout), nil +} + +// getRepoChanges returns changes to repo since last indexer update +func getRepoChanges(repo *models.Repository, revision string) (*repoChanges, error) { + if err := repo.GetIndexerStatus(); err != nil { + return nil, err + } + + if len(repo.IndexerStatus.CommitSha) == 0 { + return genesisChanges(repo, revision) + } + return nonGenesisChanges(repo, revision) +} + +func isIndexable(entry *git.TreeEntry) bool { + if !entry.IsRegular() && !entry.IsExecutable() { + return false + } + name := strings.ToLower(entry.Name()) + for _, g := range setting.Indexer.ExcludePatterns { + if g.Match(name) { + return false + } + } + for _, g := range setting.Indexer.IncludePatterns { + if g.Match(name) { + return true + } + } + return len(setting.Indexer.IncludePatterns) == 0 +} + +// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command +func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { + entries, err := git.ParseTreeEntries(stdout) + if err != nil { + return nil, err + } + var idxCount = 0 + updates := make([]fileUpdate, len(entries)) + for _, entry := range entries { + if isIndexable(entry) { + updates[idxCount] = fileUpdate{ + Filename: entry.Name(), + BlobSha: entry.ID.String(), + } + idxCount++ + } + } + return updates[:idxCount], nil +} + +// genesisChanges get changes to add repo to the indexer for the first time +func genesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { + var changes repoChanges + stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision). + RunInDirBytes(repo.RepoPath()) + if err != nil { + return nil, err + } + changes.Updates, err = parseGitLsTreeOutput(stdout) + return &changes, err +} + +// nonGenesisChanges get changes since the previous indexer update +func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { + diffCmd := git.NewCommand("diff", "--name-status", + repo.IndexerStatus.CommitSha, revision) + stdout, err := diffCmd.RunInDir(repo.RepoPath()) + if err != nil { + // previous commit sha may have been removed by a force push, so + // try rebuilding from scratch + log.Warn("git diff: %v", err) + if err = indexer.Delete(repo.ID); err != nil { + return nil, err + } + return genesisChanges(repo, revision) + } + var changes repoChanges + updatedFilenames := make([]string, 0, 10) + for _, line := range strings.Split(stdout, "\n") { + line = strings.TrimSpace(line) + if len(line) == 0 { + continue + } + filename := strings.TrimSpace(line[1:]) + if len(filename) == 0 { + continue + } else if filename[0] == '"' { + filename, err = strconv.Unquote(filename) + if err != nil { + return nil, err + } + } + + switch status := line[0]; status { + case 'M', 'A': + updatedFilenames = append(updatedFilenames, filename) + case 'D': + changes.RemovedFilenames = append(changes.RemovedFilenames, filename) + default: + log.Warn("Unrecognized status: %c (line=%s)", status, line) + } + } + + cmd := git.NewCommand("ls-tree", "--full-tree", revision, "--") + cmd.AddArguments(updatedFilenames...) + lsTreeStdout, err := cmd.RunInDirBytes(repo.RepoPath()) + if err != nil { + return nil, err + } + changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout) + return &changes, err +} From bdfe1228f81c9bc784694717bd7e3a9101a3af00 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Wed, 11 Dec 2019 11:07:30 +0800 Subject: [PATCH 5/9] fix import --- modules/indexer/code/bleve.go | 1 + modules/indexer/code/bleve_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index fc686f71e11f6..8827f7e53c116 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -19,6 +19,7 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" + "github.com/blevesearch/bleve" "github.com/blevesearch/bleve/analysis/analyzer/custom" "github.com/blevesearch/bleve/analysis/token/lowercase" diff --git a/modules/indexer/code/bleve_test.go b/modules/indexer/code/bleve_test.go index af80b2d5c3558..a0fd0b6b15167 100644 --- a/modules/indexer/code/bleve_test.go +++ b/modules/indexer/code/bleve_test.go @@ -10,6 +10,7 @@ import ( "testing" "code.gitea.io/gitea/models" + "github.com/stretchr/testify/assert" ) From 214aac1e8692b33b09dbdc0d719417c61b3fa529 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Wed, 11 Dec 2019 11:34:52 +0800 Subject: [PATCH 6/9] improve code --- modules/indexer/code/indexer.go | 13 +++++++------ modules/indexer/code/queue.go | 30 ++++++++++++------------------ 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index d25a9e79f14ea..11bd43b88d597 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -38,8 +38,8 @@ func Init() { if !setting.Indexer.RepoIndexerEnabled { return } + waitChannel := make(chan time.Duration) - repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) go func() { start := time.Now() log.Info("Initializing Repository Indexer") @@ -48,14 +48,16 @@ func Init() { if err != nil { log.Fatal("indexer.Init: %v", err) } + + go processRepoIndexerOperationQueue() + if created { - if err := populateRepoIndexerAsynchronously(); err != nil { - log.Fatal("PopulateRepoIndex: %v", err) - } + go populateRepoIndexer() } - go processRepoIndexerOperationQueue() + waitChannel <- time.Since(start) }() + if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout @@ -69,6 +71,5 @@ func Init() { log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) } }() - } } diff --git a/modules/indexer/code/queue.go b/modules/indexer/code/queue.go index 9a4de46ff54c8..a89435781390a 100644 --- a/modules/indexer/code/queue.go +++ b/modules/indexer/code/queue.go @@ -22,6 +22,7 @@ type repoIndexerOperation struct { var repoIndexerOperationQueue chan repoIndexerOperation func processRepoIndexerOperationQueue() { + repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) for { select { case op := <-repoIndexerOperationQueue: @@ -69,38 +70,31 @@ func addOperationToQueue(op repoIndexerOperation) { } } -// populateRepoIndexerAsynchronously asynchronously populates the repo indexer -// with pre-existing data. This should only be run when the indexer is created -// for the first time. -func populateRepoIndexerAsynchronously() error { +// populateRepoIndexer populate the repo indexer with pre-existing data. This +// should only be run when the indexer is created for the first time. +func populateRepoIndexer() { + log.Info("Populating the repo indexer with existing repositories") + + isShutdown := graceful.GetManager().IsShutdown() + exist, err := models.IsTableNotEmpty("repository") if err != nil { - return err + log.Fatal("System error: %v", err) } else if !exist { - return nil + return } // if there is any existing repo indexer metadata in the DB, delete it // since we are starting afresh. Also, xorm requires deletes to have a // condition, and we want to delete everything, thus 1=1. if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { - return err + log.Fatal("System error: %v", err) } var maxRepoID int64 if maxRepoID, err = models.GetMaxID("repository"); err != nil { - return err + log.Fatal("System error: %v", err) } - go populateRepoIndexer(maxRepoID) - return nil -} - -// populateRepoIndexer populate the repo indexer with pre-existing data. This -// should only be run when the indexer is created for the first time. -func populateRepoIndexer(maxRepoID int64) { - log.Info("Populating the repo indexer with existing repositories") - - isShutdown := graceful.GetManager().IsShutdown() // start with the maximum existing repo ID and work backwards, so that we // don't include repos that are created after gitea starts; such repos will From 0de12a28495dff529c0df8476be4a3a259cb0102 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Sun, 15 Dec 2019 16:23:29 +0800 Subject: [PATCH 7/9] fix typo --- modules/indexer/code/indexer.go | 2 +- modules/indexer/issues/db.go | 2 +- modules/indexer/issues/indexer.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 11bd43b88d597..d446357f0eb64 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -25,7 +25,7 @@ type SearchResult struct { Content string } -// Indexer defines an inteface to indexer issues contents +// Indexer defines an interface to indexer issues contents type Indexer interface { Init() (bool, error) Index(repoID int64) error diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go index 7d4e3894712ae..a758cfeaeebdd 100644 --- a/modules/indexer/issues/db.go +++ b/modules/indexer/issues/db.go @@ -6,7 +6,7 @@ package issues import "code.gitea.io/gitea/models" -// DBIndexer implements Indexer inteface to use database's like search +// DBIndexer implements Indexer interface to use database's like search type DBIndexer struct { } diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 50b8d6d22459f..ebcd3f68dd51c 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -38,7 +38,7 @@ type SearchResult struct { Hits []Match } -// Indexer defines an inteface to indexer issues contents +// Indexer defines an interface to indexer issues contents type Indexer interface { Init() (bool, error) Index(issue []*IndexerData) error From 60ed63c99d85505876d9f05d774346efe5c371cd Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Wed, 18 Dec 2019 23:32:47 +0800 Subject: [PATCH 8/9] fix test and make code clean --- modules/indexer/code/bleve.go | 89 +++++++++--------------------- modules/indexer/code/bleve_test.go | 17 ++++-- modules/indexer/code/indexer.go | 10 ++-- modules/indexer/code/queue.go | 4 +- 4 files changed, 47 insertions(+), 73 deletions(-) diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index 8827f7e53c116..c250936d95e71 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -5,18 +5,15 @@ package code import ( - "context" "fmt" "os" "strconv" "strings" - "sync" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" - "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -59,7 +56,7 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { // updates and bleve version updates. If index needs to be created (or // re-created), returns (nil, nil) func openIndexer(path string, latestVersion int) (bleve.Index, error) { - _, err := os.Stat(setting.Indexer.IssuePath) + _, err := os.Stat(path) if err != nil && os.IsNotExist(err) { return nil, nil } else if err != nil { @@ -137,34 +134,6 @@ const ( repoIndexerLatestVersion = 4 ) -type bleveIndexerHolder struct { - index bleve.Index - mutex sync.RWMutex - cond *sync.Cond -} - -func newBleveIndexerHolder() *bleveIndexerHolder { - b := &bleveIndexerHolder{} - b.cond = sync.NewCond(b.mutex.RLocker()) - return b -} - -func (r *bleveIndexerHolder) set(index bleve.Index) { - r.mutex.Lock() - defer r.mutex.Unlock() - r.index = index - r.cond.Broadcast() -} - -func (r *bleveIndexerHolder) get() bleve.Index { - r.mutex.RLock() - defer r.mutex.RUnlock() - if r.index == nil { - r.cond.Wait() - } - return r.index -} - // createRepoIndexer create a repo indexer if one does not already exist func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) { docMapping := bleve.NewDocumentMapping() @@ -222,51 +191,47 @@ var ( // BleveIndexer represents a bleve indexer implementation type BleveIndexer struct { - indexDir string - indexerHolder *bleveIndexerHolder + indexDir string + indexer bleve.Index } // NewBleveIndexer creates a new bleve local indexer -func NewBleveIndexer(indexDir string) *BleveIndexer { - return &BleveIndexer{ - indexDir: indexDir, - indexerHolder: newBleveIndexerHolder(), +func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) { + indexer := &BleveIndexer{ + indexDir: indexDir, } + created, err := indexer.init() + return indexer, created, err } // Init init the indexer -func (b *BleveIndexer) Init() (bool, error) { - indexer, err := openIndexer(b.indexDir, repoIndexerLatestVersion) +func (b *BleveIndexer) init() (bool, error) { + var err error + b.indexer, err = openIndexer(b.indexDir, repoIndexerLatestVersion) if err != nil { - log.Fatal("openIndexer: %v", err) + return false, err } - if indexer != nil { - b.indexerHolder.set(indexer) - b.closeAtTerminate() + if b.indexer != nil { return false, nil } - indexer, err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) + b.indexer, err = createRepoIndexer(b.indexDir, repoIndexerLatestVersion) if err != nil { return false, err } - b.indexerHolder.set(indexer) - b.closeAtTerminate() + return true, nil } -func (b *BleveIndexer) closeAtTerminate() { - graceful.GetManager().RunAtTerminate(context.Background(), func() { - log.Debug("Closing repo indexer") - indexer := b.indexerHolder.get() - if indexer != nil { - err := indexer.Close() - if err != nil { - log.Error("Error whilst closing the repository indexer: %v", err) - } +func (b *BleveIndexer) Close() { + log.Debug("Closing repo indexer") + if b.indexer != nil { + err := b.indexer.Close() + if err != nil { + log.Error("Error whilst closing the repository indexer: %v", err) } - log.Info("PID: %d Repository Indexer closed", os.Getpid()) - }) + } + log.Info("PID: %d Repository Indexer closed", os.Getpid()) } // Index indexes the data @@ -287,7 +252,7 @@ func (b *BleveIndexer) Index(repoID int64) error { return nil } - batch := rupture.NewFlushingBatch(b.indexerHolder.get(), maxBatchSize) + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) for _, update := range changes.Updates { if err := addUpdate(update, repo, batch); err != nil { return err @@ -308,11 +273,11 @@ func (b *BleveIndexer) Index(repoID int64) error { func (b *BleveIndexer) Delete(repoID int64) error { query := numericEqualityQuery(repoID, "RepoID") searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := b.indexerHolder.get().Search(searchRequest) + result, err := b.indexer.Search(searchRequest) if err != nil { return err } - batch := rupture.NewFlushingBatch(b.indexerHolder.get(), maxBatchSize) + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) for _, hit := range result.Hits { if err = batch.Delete(hit.ID); err != nil { return err @@ -348,7 +313,7 @@ func (b *BleveIndexer) Search(repoIDs []int64, keyword string, page, pageSize in searchRequest.Fields = []string{"Content", "RepoID"} searchRequest.IncludeLocations = true - result, err := b.indexerHolder.get().Search(searchRequest) + result, err := b.indexer.Search(searchRequest) if err != nil { return 0, nil, err } diff --git a/modules/indexer/code/bleve_test.go b/modules/indexer/code/bleve_test.go index a0fd0b6b15167..ac2b4119983ae 100644 --- a/modules/indexer/code/bleve_test.go +++ b/modules/indexer/code/bleve_test.go @@ -10,6 +10,8 @@ import ( "testing" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" "github.com/stretchr/testify/assert" ) @@ -22,13 +24,16 @@ func TestIndexAndSearch(t *testing.T) { models.PrepareTestEnv(t) dir := "./bleve.index" - defer os.RemoveAll(dir) - indexer := NewBleveIndexer(dir) + os.RemoveAll(dir) - _, err := indexer.Init() - assert.NoError(t, err) + setting.Indexer.RepoIndexerEnabled = true + idx, _, err := NewBleveIndexer(dir) + if err != nil { + idx.Close() + log.Fatal("indexer.Init: %v", err) + } - err = indexer.Index(1) + err = idx.Index(1) assert.NoError(t, err) var ( @@ -52,7 +57,7 @@ func TestIndexAndSearch(t *testing.T) { ) for _, kw := range keywords { - total, res, err := indexer.Search(nil, kw.Keyword, 1, 10) + total, res, err := idx.Search(nil, kw.Keyword, 1, 10) assert.NoError(t, err) assert.EqualValues(t, len(kw.IDs), total) diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index d446357f0eb64..c68c7c2d71c3f 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -27,10 +27,10 @@ type SearchResult struct { // Indexer defines an interface to indexer issues contents type Indexer interface { - Init() (bool, error) Index(repoID int64) error Delete(repoID int64) error Search(repoIDs []int64, keyword string, page, pageSize int) (int64, []*SearchResult, error) + Close() } // Init initialize the repo indexer @@ -43,13 +43,15 @@ func Init() { go func() { start := time.Now() log.Info("Initializing Repository Indexer") - indexer = NewBleveIndexer(setting.Indexer.RepoPath) - created, err := indexer.Init() + var created bool + var err error + indexer, created, err = NewBleveIndexer(setting.Indexer.RepoPath) if err != nil { + indexer.Close() log.Fatal("indexer.Init: %v", err) } - go processRepoIndexerOperationQueue() + go processRepoIndexerOperationQueue(indexer) if created { go populateRepoIndexer() diff --git a/modules/indexer/code/queue.go b/modules/indexer/code/queue.go index a89435781390a..90f7d1bb19110 100644 --- a/modules/indexer/code/queue.go +++ b/modules/indexer/code/queue.go @@ -21,7 +21,9 @@ type repoIndexerOperation struct { var repoIndexerOperationQueue chan repoIndexerOperation -func processRepoIndexerOperationQueue() { +func processRepoIndexerOperationQueue(indexer Indexer) { + defer indexer.Close() + repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) for { select { From 42ad599143d852a186e395582e7d244acbbe8915 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Wed, 18 Dec 2019 23:41:53 +0800 Subject: [PATCH 9/9] fix lint --- modules/indexer/code/bleve.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index c250936d95e71..339dca74a12a1 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -204,7 +204,7 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) { return indexer, created, err } -// Init init the indexer +// init init the indexer func (b *BleveIndexer) init() (bool, error) { var err error b.indexer, err = openIndexer(b.indexDir, repoIndexerLatestVersion) @@ -223,6 +223,7 @@ func (b *BleveIndexer) init() (bool, error) { return true, nil } +// Close close the indexer func (b *BleveIndexer) Close() { log.Debug("Closing repo indexer") if b.indexer != nil {