Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
search: extract progress from graphqlbackend (#17373)
Browse files Browse the repository at this point in the history
This is a pure refactor to extract code related to progress updates from graphlqlbackend.
  • Loading branch information
stefanhengl authored Jan 19, 2021
1 parent 57ad84d commit b32224c
Show file tree
Hide file tree
Showing 22 changed files with 427 additions and 363 deletions.
5 changes: 3 additions & 2 deletions cmd/frontend/graphqlbackend/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
searchbackend "github.com/sourcegraph/sourcegraph/internal/search/backend"
"github.com/sourcegraph/sourcegraph/internal/search/query"
querytypes "github.com/sourcegraph/sourcegraph/internal/search/query/types"
"github.com/sourcegraph/sourcegraph/internal/search/streaming"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/internal/vcs"
"github.com/sourcegraph/sourcegraph/schema"
Expand Down Expand Up @@ -577,7 +578,7 @@ func sortSearchSuggestions(s []*searchSuggestionResolver) {
// returning common as to reflect that new information. If searchErr is a fatal error,
// it returns a non-nil error; otherwise, if searchErr == nil or a non-fatal error, it returns a
// nil error.
func handleRepoSearchResult(repoRev *search.RepositoryRevisions, limitHit, timedOut bool, searchErr error) (_ SearchResultsCommon, fatalErr error) {
func handleRepoSearchResult(repoRev *search.RepositoryRevisions, limitHit, timedOut bool, searchErr error) (_ streaming.Stats, fatalErr error) {
var status search.RepoStatus
if limitHit {
status |= search.RepoStatusLimitHit
Expand All @@ -604,7 +605,7 @@ func handleRepoSearchResult(repoRev *search.RepositoryRevisions, limitHit, timed
} else {
status |= search.RepoStatusSearched
}
return SearchResultsCommon{
return streaming.Stats{
Status: search.RepoStatusSingleton(repoRev.Repo.ID, status),
IsLimitHit: limitHit,
}, fatalErr
Expand Down
11 changes: 6 additions & 5 deletions cmd/frontend/graphqlbackend/search_commits.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/errcode"
"github.com/sourcegraph/sourcegraph/internal/search"
"github.com/sourcegraph/sourcegraph/internal/search/query"
"github.com/sourcegraph/sourcegraph/internal/search/streaming"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/internal/vcs/git"
)
Expand Down Expand Up @@ -525,7 +526,7 @@ type searchCommitsInReposParameters struct {
ResultChannel chan<- []SearchResultResolver
}

func searchCommitsInRepos(ctx context.Context, args *search.TextParametersForCommitParameters, params searchCommitsInReposParameters) ([]SearchResultResolver, *SearchResultsCommon, error) {
func searchCommitsInRepos(ctx context.Context, args *search.TextParametersForCommitParameters, params searchCommitsInReposParameters) ([]SearchResultResolver, *streaming.Stats, error) {
var err error
tr, ctx := trace.New(ctx, params.TraceName, fmt.Sprintf("query: %+v, numRepoRevs: %d", args.PatternInfo, len(args.Repos)))
defer func() {
Expand Down Expand Up @@ -559,7 +560,7 @@ func searchCommitsInRepos(ctx context.Context, args *search.TextParametersForCom
wg sync.WaitGroup
mu sync.Mutex
unflattened [][]*CommitSearchResultResolver
common = &SearchResultsCommon{}
common = &streaming.Stats{}
)
for _, repoRev := range args.Repos {
// Skip the repo if no revisions were resolved for it
Expand Down Expand Up @@ -587,7 +588,7 @@ func searchCommitsInRepos(ctx context.Context, args *search.TextParametersForCom
err = errors.Wrapf(searchErr, "failed to search commit %s %s", params.ErrorName, repoRev.String())
cancel()
}
common.update(&repoCommon)
common.Update(&repoCommon)
if len(results) > 0 {
unflattened = append(unflattened, results)
}
Expand All @@ -606,7 +607,7 @@ func searchCommitsInRepos(ctx context.Context, args *search.TextParametersForCom
}

// searchCommitDiffsInRepos searches a set of repos for matching commit diffs.
func searchCommitDiffsInRepos(ctx context.Context, args *search.TextParametersForCommitParameters, resultChannel chan<- []SearchResultResolver) ([]SearchResultResolver, *SearchResultsCommon, error) {
func searchCommitDiffsInRepos(ctx context.Context, args *search.TextParametersForCommitParameters, resultChannel chan<- []SearchResultResolver) ([]SearchResultResolver, *streaming.Stats, error) {
return searchCommitsInRepos(ctx, args, searchCommitsInReposParameters{
TraceName: "searchCommitDiffsInRepos",
ErrorName: "diffs",
Expand All @@ -620,7 +621,7 @@ func searchCommitDiffsInRepos(ctx context.Context, args *search.TextParametersFo
}

// searchCommitLogInRepos searches a set of repos for matching commits.
func searchCommitLogInRepos(ctx context.Context, args *search.TextParametersForCommitParameters, resultChannel chan<- []SearchResultResolver) ([]SearchResultResolver, *SearchResultsCommon, error) {
func searchCommitLogInRepos(ctx context.Context, args *search.TextParametersForCommitParameters, resultChannel chan<- []SearchResultResolver) ([]SearchResultResolver, *streaming.Stats, error) {
var terms []string
if args.PatternInfo.Pattern != "" {
terms = append(terms, args.PatternInfo.Pattern)
Expand Down
49 changes: 25 additions & 24 deletions cmd/frontend/graphqlbackend/search_pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/db"
"github.com/sourcegraph/sourcegraph/internal/search"
"github.com/sourcegraph/sourcegraph/internal/search/streaming"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/internal/types"
)
Expand Down Expand Up @@ -175,12 +176,12 @@ func (r *searchResolver) paginatedResults(ctx context.Context) (result *SearchRe
return repoIsLess(resolved.RepoRevs[i].Repo, resolved.RepoRevs[j].Repo)
})

common := SearchResultsCommon{}
common := streaming.Stats{}
cursor, results, fileCommon, err := paginatedSearchFilesInRepos(ctx, &args, r.pagination)
if err != nil {
return nil, err
}
common.update(fileCommon)
common.Update(fileCommon)

tr.LazyPrintf("results=%d %s", len(results), &common)

Expand All @@ -199,11 +200,11 @@ func (r *searchResolver) paginatedResults(ctx context.Context) (result *SearchRe
)

return &SearchResultsResolver{
start: start,
SearchResultsCommon: common,
SearchResults: results,
alert: alert,
cursor: cursor,
start: start,
Stats: common,
SearchResults: results,
alert: alert,
cursor: cursor,
}, nil
}

Expand Down Expand Up @@ -239,7 +240,7 @@ func repoIsLess(i, j *types.RepoName) bool {
// top of the penalty we incur from the larger `count:` mentioned in point
// 2 above (in the worst case scenario).
//
func paginatedSearchFilesInRepos(ctx context.Context, args *search.TextParameters, pagination *searchPaginationInfo) (*searchCursor, []SearchResultResolver, *SearchResultsCommon, error) {
func paginatedSearchFilesInRepos(ctx context.Context, args *search.TextParameters, pagination *searchPaginationInfo) (*searchCursor, []SearchResultResolver, *streaming.Stats, error) {
repos, err := getRepos(ctx, args.RepoPromise)
if err != nil {
return nil, nil, nil, err
Expand All @@ -252,18 +253,18 @@ func paginatedSearchFilesInRepos(ctx context.Context, args *search.TextParameter
searchBucketMin: 10,
searchBucketMax: 1000,
}
return plan.execute(ctx, func(batch []*search.RepositoryRevisions) ([]SearchResultResolver, *SearchResultsCommon, error) {
return plan.execute(ctx, func(batch []*search.RepositoryRevisions) ([]SearchResultResolver, *streaming.Stats, error) {
batchArgs := *args
batchArgs.RepoPromise = (&search.Promise{}).Resolve(batch)
fileResults, fileCommon, err := searchFilesInRepos(ctx, &batchArgs, nil)
// Timeouts are reported through SearchResultsCommon so don't report an error for them
// Timeouts are reported through Stats so don't report an error for them
if err != nil && !(err == context.DeadlineExceeded || err == context.Canceled) {
return nil, nil, err
}
if fileCommon == nil {
// searchFilesInRepos can return a nil structure, but the executor
// requires a non-nil one always (which is more sane).
fileCommon = &SearchResultsCommon{}
fileCommon = &streaming.Stats{}
}
// fileResults is not sorted so we must sort it now. fileCommon may or
// may not be sorted, but we do not rely on its order.
Expand Down Expand Up @@ -310,9 +311,9 @@ type repoPaginationPlan struct {

// executor is a function which searches a batch of repositories.
//
// A non-nil SearchResultsCommon must always be returned, even if an error is
// A non-nil Stats must always be returned, even if an error is
// returned.
type executor func(batch []*search.RepositoryRevisions) ([]SearchResultResolver, *SearchResultsCommon, error)
type executor func(batch []*search.RepositoryRevisions) ([]SearchResultResolver, *streaming.Stats, error)

// repoOfResult is a helper function to resolve the repo associated with a result type.
func repoOfResult(result SearchResultResolver) string {
Expand All @@ -337,7 +338,7 @@ func repoOfResult(result SearchResultResolver) string {
//
// If the executor returns any error, the search will be cancelled and the error
// returned.
func (p *repoPaginationPlan) execute(ctx context.Context, exec executor) (c *searchCursor, results []SearchResultResolver, common *SearchResultsCommon, err error) {
func (p *repoPaginationPlan) execute(ctx context.Context, exec executor) (c *searchCursor, results []SearchResultResolver, common *streaming.Stats, err error) {
// Determine how large the batches of repositories we will search over will be.
var totalRepos int
if p.mockNumTotalRepos != nil {
Expand All @@ -363,15 +364,15 @@ func (p *repoPaginationPlan) execute(ctx context.Context, exec executor) (c *sea
repos = repos[repositoryOffset:]
}

// Search backends don't populate SearchResultsCommon.repos, the
// Search backends don't populate Stats.repos, the
// repository searcher does. We need to do that here.
commonRepos := make(map[api.RepoID]*types.RepoName, len(repos))
for _, r := range repos {
commonRepos[r.Repo.ID] = r.Repo
}

// Search over the repos list in batches.
common = &SearchResultsCommon{
common = &streaming.Stats{
Repos: commonRepos,
}
for start := 0; start <= len(repos); start += batchSize {
Expand All @@ -382,15 +383,15 @@ func (p *repoPaginationPlan) execute(ctx context.Context, exec executor) (c *sea
batch := repos[start:clamp(start+batchSize, 0, len(repos))]
batchResults, batchCommon, err := exec(batch)
if batchCommon == nil {
panic("never here: repoPaginationPlan.executor illegally returned nil SearchResultsCommon structure")
panic("never here: repoPaginationPlan.executor illegally returned nil Stats structure")
}
if err != nil {
return nil, nil, nil, err
}

// Accumulate the results and stop if we have enough for the user.
results = append(results, batchResults...)
common.update(batchCommon)
common.Update(batchCommon)

if len(results) >= resultOffset+int(p.pagination.limit) {
break
Expand Down Expand Up @@ -447,7 +448,7 @@ type slicedSearchResults struct {
results []SearchResultResolver

// common is the new common results structure, updated to reflect the sliced results only.
common *SearchResultsCommon
common *streaming.Stats

// resultOffset indicates where the search would continue within the last
// repository whose results were consumed. For example:
Expand All @@ -463,9 +464,9 @@ type slicedSearchResults struct {
}

// sliceSearchResults effectively slices results[offset:offset+limit] and
// returns an updated SearchResultsCommon structure to reflect that, as well as
// returns an updated Stats structure to reflect that, as well as
// information about the slicing that was performed.
func sliceSearchResults(results []SearchResultResolver, common *SearchResultsCommon, offset, limit int) (final slicedSearchResults) {
func sliceSearchResults(results []SearchResultResolver, common *streaming.Stats, offset, limit int) (final slicedSearchResults) {
firstRepo := ""
if len(results[:offset]) > 0 {
firstRepo = repoOfResult(results[offset])
Expand Down Expand Up @@ -526,7 +527,7 @@ func sliceSearchResults(results []SearchResultResolver, common *SearchResultsCom
final.resultOffset++
}

// Construct the new SearchResultsCommon structure for just the results
// Construct the new Stats structure for just the results
// we're returning.
seenRepos := map[string]struct{}{}
finalResults := make([]SearchResultResolver, 0, limit)
Expand All @@ -548,11 +549,11 @@ func sliceSearchResults(results []SearchResultResolver, common *SearchResultsCom
return
}

func sliceSearchResultsCommon(common *SearchResultsCommon, firstResultRepo, lastResultRepo string) *SearchResultsCommon {
func sliceSearchResultsCommon(common *streaming.Stats, firstResultRepo, lastResultRepo string) *streaming.Stats {
if common.Status.Any(search.RepoStatusLimitHit) {
panic("never here: partial results should never be present in paginated search")
}
final := &SearchResultsCommon{
final := &streaming.Stats{
IsLimitHit: false, // irrelevant in paginated search
IsIndexUnavailable: common.IsIndexUnavailable,
Repos: make(map[api.RepoID]*types.RepoName),
Expand Down
Loading

0 comments on commit b32224c

Please sign in to comment.