Skip to content

Commit

Permalink
Reduce to a single code path through sendBatches regardless of limit (#…
Browse files Browse the repository at this point in the history
…6216)

* Reduce to a single code path through sendBatches regardless of limit specified

* simplify sendBatches by using -1 to specify no limit

* test fix
  • Loading branch information
splitice authored Jun 3, 2022
1 parent 135f672 commit de418bf
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 28 deletions.
8 changes: 7 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,13 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie

defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)

return sendBatches(ctx, it, queryServer, req.Limit)
// sendBatches uses -1 to specify no limit.
batchLimit := int32(req.Limit)
if batchLimit == 0 {
batchLimit = -1
}

return sendBatches(ctx, it, queryServer, batchLimit)
}

// QuerySample the ingesters for series from logs matching a set of matchers.
Expand Down
37 changes: 11 additions & 26 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,38 +716,23 @@ type QuerierQueryServer interface {
Send(res *logproto.QueryResponse) error
}

func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit uint32) error {
func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) error {
stats := stats.FromContext(ctx)
if limit == 0 {
// send all batches.
for !isDone(ctx) {
batch, size, err := iter.ReadBatch(i, queryBatchSize)
if err != nil {
return err
}
if len(batch.Streams) == 0 {
return nil
}
stats.AddIngesterBatch(int64(size))
batch.Stats = stats.Ingester()

if err := queryServer.Send(batch); err != nil {
return err
}

stats.Reset()

}
return nil
}
// send until the limit is reached.
sent := uint32(0)
for sent < limit && !isDone(queryServer.Context()) {
batch, batchSize, err := iter.ReadBatch(i, math.MinUint32(queryBatchSize, limit-sent))
for limit != 0 && !isDone(ctx) {
fetchSize := uint32(queryBatchSize)
if limit > 0 {
fetchSize = math.MinUint32(queryBatchSize, uint32(limit))
}
batch, batchSize, err := iter.ReadBatch(i, fetchSize)
if err != nil {
return err
}
sent += batchSize

if limit > 0 {
limit -= int32(batchSize)
}

if len(batch.Streams) == 0 {
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func Test_Iterator(t *testing.T) {
return nil
},
),
uint32(2)),
int32(2)),
)
require.Equal(t, 2, len(res.Streams))
// each entry translated into a unique stream
Expand Down

0 comments on commit de418bf

Please sign in to comment.