diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 8cfd0b44e8102..309dedb0b99b9 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -474,9 +474,10 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi its := make([]iter.EntryIterator, 0, len(c.blocks)+1) for _, b := range c.blocks { - if maxt > b.mint && b.maxt > mint { - its = append(its, b.iterator(ctx, c.readers, filter)) + if maxt < b.mint || b.maxt < mint { + continue } + its = append(its, b.iterator(ctx, c.readers, filter)) } if !c.head.isEmpty() { diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 72b1961d41e63..9d6714507d143 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -10,9 +10,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/dustin/go-humanize" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/chunkenc/testdata" @@ -644,3 +643,73 @@ func BenchmarkHeadBlockIterator(b *testing.B) { }) } } + +func TestMemChunk_IteratorBounds(t *testing.T) { + + var createChunk = func() *MemChunk { + t.Helper() + c := NewMemChunk(EncNone, 1e6, 1e6) + + if err := c.Append(&logproto.Entry{ + Timestamp: time.Unix(0, 1), + Line: "1", + }); err != nil { + t.Fatal(err) + } + if err := c.Append(&logproto.Entry{ + Timestamp: time.Unix(0, 2), + Line: "2", + }); err != nil { + t.Fatal(err) + } + return c + } + + for _, tt := range []struct { + mint, maxt time.Time + direction logproto.Direction + expect []bool // array of expected values for next call in sequence + }{ + {time.Unix(0, 0), time.Unix(0, 1), logproto.FORWARD, []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 2), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), logproto.FORWARD, []bool{true, true, false}}, + {time.Unix(0, 2), time.Unix(0, 3), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), logproto.FORWARD, []bool{false}}, + + {time.Unix(0, 0), time.Unix(0, 1), logproto.BACKWARD, []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 2), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), logproto.BACKWARD, []bool{true, true, false}}, + {time.Unix(0, 2), time.Unix(0, 3), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), logproto.BACKWARD, []bool{false}}, + } { + t.Run( + fmt.Sprintf("mint:%d,maxt:%d,direction:%s", tt.mint.UnixNano(), tt.maxt.UnixNano(), tt.direction), + func(t *testing.T) { + tt := tt + c := createChunk() + + // testing headchunk + it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil) + require.NoError(t, err) + for i := range tt.expect { + require.Equal(t, tt.expect[i], it.Next()) + } + require.NoError(t, it.Close()) + + // testing chunk blocks + require.NoError(t, c.cut()) + it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil) + require.NoError(t, err) + for i := range tt.expect { + require.Equal(t, tt.expect[i], it.Next()) + } + require.NoError(t, it.Close()) + }) + + } + +} diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 45ebff74d46eb..4c5669b20f773 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -503,11 +503,18 @@ func (i *timeRangedIterator) Next() bool { ts := i.EntryIterator.Entry().Timestamp for ok && i.mint.After(ts) { ok = i.EntryIterator.Next() + if !ok { + continue + } ts = i.EntryIterator.Entry().Timestamp } - - if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive. - ok = false + if ok { + if ts.Equal(i.mint) { // The mint is inclusive + return true + } + if i.maxt.Before(ts) || i.maxt.Equal(ts) { // The maxt is exclusive. + ok = false + } } if !ok { i.EntryIterator.Close() diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index c02246fc0e26d..7a7f3cbaddfbf 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -7,9 +7,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/stats" @@ -525,3 +524,40 @@ func Test_DuplicateCount(t *testing.T) { }) } } + +func Test_timeRangedIterator_Next(t *testing.T) { + + tests := []struct { + mint time.Time + maxt time.Time + expect []bool // array of expected values for next call in sequence + }{ + {time.Unix(0, 0), time.Unix(0, 0), []bool{false}}, + {time.Unix(0, 0), time.Unix(0, 1), []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), []bool{true, true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), []bool{true, false}}, + {time.Unix(0, 4), time.Unix(0, 10), []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 10), []bool{true, true, true, false}}, + {time.Unix(0, 0), time.Unix(0, 10), []bool{true, true, true, false}}, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("mint:%d maxt:%d", tt.mint.UnixNano(), tt.maxt.UnixNano()), func(t *testing.T) { + i := NewTimeRangedIterator( + NewStreamIterator( + logproto.Stream{Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1)}, + {Timestamp: time.Unix(0, 2)}, + {Timestamp: time.Unix(0, 3)}, + }}), + tt.mint, + tt.maxt, + ) + for _, b := range tt.expect { + require.Equal(t, b, i.Next()) + } + require.NoError(t, i.Close()) + }) + } +} diff --git a/pkg/logql/range_vector_test.go b/pkg/logql/range_vector_test.go index af2446c61c00a..c5138bf884359 100644 --- a/pkg/logql/range_vector_test.go +++ b/pkg/logql/range_vector_test.go @@ -122,7 +122,7 @@ func Test_RangeVectorIterator(t *testing.T) { time.Unix(10, 0), time.Unix(100, 0), }, { - (50 * time.Second).Nanoseconds(), // all step are overlaping + (50 * time.Second).Nanoseconds(), // all step are overlapping (10 * time.Second).Nanoseconds(), []promql.Vector{ []promql.Sample{ diff --git a/pkg/promtail/client/config_test.go b/pkg/promtail/client/config_test.go index 74d8c04c2814d..42fbc46dc083c 100644 --- a/pkg/promtail/client/config_test.go +++ b/pkg/promtail/client/config_test.go @@ -41,7 +41,7 @@ func Test_Config(t *testing.T) { clientDefaultConfig, Config{ URL: flagext.URLValue{ - u, + URL: u, }, BackoffConfig: util.BackoffConfig{ MaxBackoff: 5 * time.Minute, @@ -57,7 +57,7 @@ func Test_Config(t *testing.T) { clientCustomConfig, Config{ URL: flagext.URLValue{ - u, + URL: u, }, BackoffConfig: util.BackoffConfig{ MaxBackoff: 1 * time.Minute, @@ -75,7 +75,7 @@ func Test_Config(t *testing.T) { require.NoError(t, err) if !reflect.DeepEqual(tc.expectedConfig, clientConfig) { - t.Errorf("Configs does not match, expected: %v, recieved: %v", tc.expectedConfig, clientConfig) + t.Errorf("Configs does not match, expected: %v, received: %v", tc.expectedConfig, clientConfig) } } } diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index 6ad0fb98d3bec..3f2c297a3d1a3 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -470,17 +470,8 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if _, ok := h.receivedMap[file]; ok { - h.receivedMap[file] = append(h.receivedMap[file], s.Entries...) - } else { - h.receivedMap[file] = s.Entries - } - - if _, ok := h.receivedLabels[file]; ok { - h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels) - } else { - h.receivedLabels[file] = []labels.Labels{parsedLabels} - } + h.receivedMap[file] = append(h.receivedMap[file], s.Entries...) + h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels) } diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go index ed0e306f3fd9b..02e0dcac10611 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/iterator.go @@ -243,7 +243,9 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) { } else { from = time.Unix(0, headChunk.Chunk.From.UnixNano()) - if from.Before(it.req.Start) { + // when clipping the from it should never be before the start or equal to the end. + // Doing so would include entries not requested. + if from.Before(it.req.Start) || from.Equal(it.req.End) { from = it.req.Start } }