Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: fix missing logs with batched chunk iterator #1299

Merged
merged 5 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 62 additions & 13 deletions pkg/storage/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ type batchChunkIterator struct {

// newBatchChunkIterator creates a new batch iterator with the given batchSize.
func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, batchSize int, matchers []*labels.Matcher, filter logql.Filter, req *logproto.QueryRequest) *batchChunkIterator {

// __name__ is not something we filter by because it's a constant in loki and only used for upstream compatibility.
// Therefore remove it
for i := range matchers {
if matchers[i].Name == labels.MetricName {
matchers = append(matchers[:i], matchers[i+1:]...)
break
}
}

res := &batchChunkIterator{
batchSize: batchSize,
matchers: matchers,
Expand Down Expand Up @@ -112,6 +122,9 @@ func (it *batchChunkIterator) Next() bool {
}

func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// the first chunk of the batch
headChunk := it.chunks.Peek()

// pop the next batch of chunks and append/preprend previous overlapping chunks
// so we can merge/de-dupe overlapping entries.
batch := make([]*chunkenc.LazyChunk, 0, it.batchSize+len(it.lastOverlapping))
Expand All @@ -130,6 +143,14 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// so that overlapping chunks are together
if it.req.Direction == logproto.BACKWARD {
from = time.Unix(0, nextChunk.Chunk.Through.UnixNano())

// we have to reverse the inclusivity of the chunk iterator from
// [from, through) to (from, through] for backward queries, except when
// the batch's `from` is equal to the query's Start. This can be achieved
// by shifting `from` by one nanosecond.
if !from.Equal(it.req.Start) {
from = from.Add(time.Nanosecond)
}
} else {
through = time.Unix(0, nextChunk.Chunk.From.UnixNano())
}
Expand All @@ -149,7 +170,7 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// └────────────────────┘
//
// And nextChunk is # 49, we need to keep references to #47 and #48 as they won't be
// iterated over completely (we're clipping through to #49's from) and then add them to the next batch.
// iterated over completely (we're clipping through to #49's from) and then add them to the next batch.
it.lastOverlapping = it.lastOverlapping[:0]
for _, c := range batch {
if it.req.Direction == logproto.BACKWARD {
Expand All @@ -162,13 +183,27 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
}
}
}
}

if it.req.Direction == logproto.BACKWARD {
through = time.Unix(0, headChunk.Chunk.Through.UnixNano())

if through.After(it.req.End) {
through = it.req.End
}

// we have to reverse the inclusivity of the chunk iterator from
// [from, through) to (from, through] for backward queries, except when
// the batch's `through` is equal to the query's End. This can be achieved
// by shifting `through` by one nanosecond.
if !through.Equal(it.req.End) {
through = through.Add(time.Nanosecond)
}
} else {
if len(it.lastOverlapping) > 0 {
if it.req.Direction == logproto.BACKWARD {
through = time.Unix(0, it.lastOverlapping[0].Chunk.From.UnixNano())
} else {
from = time.Unix(0, it.lastOverlapping[0].Chunk.Through.UnixNano())
}
from = time.Unix(0, headChunk.Chunk.From.UnixNano())

if from.Before(it.req.Start) {
from = it.req.Start
}
}

Expand Down Expand Up @@ -250,12 +285,9 @@ func buildIterators(ctx context.Context, chks map[model.Fingerprint][][]*chunken

func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter logql.Filter, direction logproto.Direction, from, through time.Time) (iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
if chks[0][0].Chunk.Metric.Has("__name__") {
labelsBuilder := labels.NewBuilder(chks[0][0].Chunk.Metric)
labelsBuilder.Del("__name__")
chks[0][0].Chunk.Metric = labelsBuilder.Labels()
}
labels := chks[0][0].Chunk.Metric.String()

// __name__ is only used for upstream compatibility and is hardcoded within loki. Strip it from the return label set.
labels := dropLabels(chks[0][0].Chunk.Metric, labels.MetricName).String()

for i := range chks {
iterators := make([]iter.EntryIterator, 0, len(chks[i]))
Expand Down Expand Up @@ -400,3 +432,20 @@ outer:

return css
}

// dropLabels returns a new label set with certain labels dropped
func dropLabels(ls labels.Labels, removals ...string) (dst labels.Labels) {
toDel := make(map[string]struct{})
for _, r := range removals {
toDel[r] = struct{}{}
}

for _, l := range ls {
_, remove := toDel[l.Name]
if !remove {
dst = append(dst, l)
}
}

return dst
}
Loading