From a31228060ec75b010e2d198f475ca669c6b89ce4 Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 12 Apr 2022 09:28:40 +0100 Subject: [PATCH 01/19] Add filter parameter to rebound Signed-off-by: Michel Hollands --- pkg/chunkenc/dumb_chunk.go | 3 +- pkg/chunkenc/facade.go | 5 +- pkg/chunkenc/interface.go | 3 +- pkg/chunkenc/memchunk.go | 8 +- pkg/chunkenc/memchunk_test.go | 96 ++++++++++++++++++- pkg/storage/chunk/bigchunk.go | 3 +- pkg/storage/chunk/interface.go | 3 +- .../shipper/compactor/retention/retention.go | 2 +- pkg/util/filter/filter_function.go | 3 + 9 files changed, 116 insertions(+), 10 deletions(-) create mode 100644 pkg/util/filter/filter_function.go diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 3a0e45837bfa2..e0f4cf670ed4b 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/filter" ) const ( @@ -122,7 +123,7 @@ func (c *dumbChunk) Close() error { return nil } -func (c *dumbChunk) Rebound(start, end time.Time) (Chunk, error) { +func (c *dumbChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error) { return nil, nil } diff --git a/pkg/chunkenc/facade.go b/pkg/chunkenc/facade.go index 36c6331aafe07..1db71c189cd65 100644 --- a/pkg/chunkenc/facade.go +++ b/pkg/chunkenc/facade.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/util/filter" ) // GzipLogChunk is a cortex encoding type for our chunks. @@ -86,8 +87,8 @@ func (f Facade) LokiChunk() Chunk { return f.c } -func (f Facade) Rebound(start, end model.Time) (chunk.Data, error) { - newChunk, err := f.c.Rebound(start.Time(), end.Time()) +func (f Facade) Rebound(start, end model.Time, filter filter.Func) (chunk.Data, error) { + newChunk, err := f.c.Rebound(start.Time(), end.Time(), filter) if err != nil { return nil, err } diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 74fc87c8ed5cb..7ac494c796714 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/filter" ) // Errors returned by the chunk interface. @@ -127,7 +128,7 @@ type Chunk interface { CompressedSize() int Close() error Encoding() Encoding - Rebound(start, end time.Time) (Chunk, error) + Rebound(start, end time.Time, filter filter.Func) (Chunk, error) } // Block is a chunk block. diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index e29e1f25df723..ffb4f179a0de1 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/util/filter" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -713,7 +714,7 @@ func (c *MemChunk) reorder() error { // Otherwise, we need to rebuild the blocks from, to := c.Bounds() - newC, err := c.Rebound(from, to) + newC, err := c.Rebound(from, to, nil) if err != nil { return err } @@ -910,7 +911,7 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block { } // Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive) -func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) { +func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error) { // add a millisecond to end time because the Chunk.Iterator considers end time to be non-inclusive. itr, err := c.Iterator(context.Background(), start, end.Add(time.Millisecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) if err != nil { @@ -931,6 +932,9 @@ func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) { for itr.Next() { entry := itr.Entry() + if filter != nil && filter(entry.Line) { + continue + } if err := newChunk.Append(&entry); err != nil { return nil, err } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index d94259cc2ea42..0b6d7e20d443e 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -1188,7 +1188,7 @@ func TestMemChunk_Rebound(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo) + newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo, nil) if tc.err != nil { require.Equal(t, tc.err, err) return @@ -1231,3 +1231,97 @@ func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk { return chk } + +func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) { + chkFrom := time.Unix(1, 0) // headBlock.Append treats Unix time 0 as not set so we have to use a later time + chkFromPlus5 := chkFrom.Add(5 * time.Second) + chkThrough := chkFrom.Add(10 * time.Second) + chkThroughPlus1 := chkThrough.Add(1 * time.Second) + + filterFunc := func(in string) bool { + return strings.HasPrefix(in, "matching") + } + + for _, tc := range []struct { + name string + matchingSliceFrom, matchingSliceTo *time.Time + err error + nrMatching int + nrNotMatching int + }{ + { + name: "no matches", + nrMatching: 0, + nrNotMatching: 10, + }, + { + name: "some lines removed", + matchingSliceFrom: &chkFrom, + matchingSliceTo: &chkFromPlus5, + nrMatching: 5, + nrNotMatching: 5, + }, + { + name: "all lines match", + err: chunk.ErrSliceNoDataInRange, + matchingSliceFrom: &chkFrom, + matchingSliceTo: &chkThroughPlus1, + }, + } { + t.Run(tc.name, func(t *testing.T) { + originalChunk := buildFilterableTestMemChunk(t, chkFrom, chkThrough, tc.matchingSliceFrom, tc.matchingSliceTo) + newChunk, err := originalChunk.Rebound(chkFrom, chkThrough, filterFunc) + if tc.err != nil { + require.Equal(t, tc.err, err) + return + } + require.NoError(t, err) + + // iterate originalChunk from slice start to slice end + nanosecond. Adding a nanosecond here to be inclusive of sample at end time. + originalChunkItr, err := originalChunk.Iterator(context.Background(), chkFrom, chkThrough.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + require.NoError(t, err) + originalChunkSamples := 0 + for originalChunkItr.Next() { + originalChunkSamples++ + } + require.Equal(t, tc.nrMatching+tc.nrNotMatching, originalChunkSamples) + + // iterate newChunk for whole chunk interval which should include all the samples in the chunk and hence align it with expected values. + newChunkItr, err := newChunk.Iterator(context.Background(), chkFrom, chkThrough.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + require.NoError(t, err) + newChunkSamples := 0 + for newChunkItr.Next() { + newChunkSamples++ + } + require.Equal(t, tc.nrNotMatching, newChunkSamples) + }) + } +} + +func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time) *MemChunk { + chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, defaultBlockSize, 0) + t.Logf("from : %v", from.String()) + t.Logf("through: %v", through.String()) + for from.Before(through) { + // If a line is between matchingFrom and matchingTo add the prefix "matching" + if matchingFrom != nil && matchingTo != nil && + (from.Equal(*matchingFrom) || (from.After(*matchingFrom) && (from.Before(*matchingTo)))) { + t.Logf("%v matching line", from.String()) + err := chk.Append(&logproto.Entry{ + Line: fmt.Sprintf("matching %v", from.String()), + Timestamp: from, + }) + require.NoError(t, err) + } else { + t.Logf("%v non-match line", from.String()) + err := chk.Append(&logproto.Entry{ + Line: from.String(), + Timestamp: from, + }) + require.NoError(t, err) + } + from = from.Add(time.Second) + } + + return chk +} diff --git a/pkg/storage/chunk/bigchunk.go b/pkg/storage/chunk/bigchunk.go index 70b188faaad24..bda0aeca6daef 100644 --- a/pkg/storage/chunk/bigchunk.go +++ b/pkg/storage/chunk/bigchunk.go @@ -6,6 +6,7 @@ import ( "errors" "io" + "github.com/grafana/loki/pkg/util/filter" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/tsdb/chunkenc" ) @@ -85,7 +86,7 @@ func (b *bigchunk) addNextChunk(start model.Time) error { return nil } -func (b *bigchunk) Rebound(start, end model.Time) (Data, error) { +func (b *bigchunk) Rebound(start, end model.Time, filter filter.Func) (Data, error) { return nil, errors.New("not implemented") } diff --git a/pkg/storage/chunk/interface.go b/pkg/storage/chunk/interface.go index be5f83211076b..541383d225fb9 100644 --- a/pkg/storage/chunk/interface.go +++ b/pkg/storage/chunk/interface.go @@ -20,6 +20,7 @@ import ( "context" "io" + "github.com/grafana/loki/pkg/util/filter" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" errs "github.com/weaveworks/common/errors" @@ -47,7 +48,7 @@ type Data interface { // Rebound returns a smaller chunk that includes all samples between start and end (inclusive). // We do not want to change existing Slice implementations because // it is built specifically for query optimization and is a noop for some of the encodings. - Rebound(start, end model.Time) (Data, error) + Rebound(start, end model.Time, filter filter.Func) (Data, error) // Size returns the approximate length of the chunk in bytes. Size() int Utilization() float64 diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index f6a883b7cb0c9..a74901c0cf8a2 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -328,7 +328,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, interva wroteChunks := false for _, interval := range intervals { - newChunkData, err := chks[0].Data.Rebound(interval.Start, interval.End) + newChunkData, err := chks[0].Data.Rebound(interval.Start, interval.End, nil) if err != nil { return false, err } diff --git a/pkg/util/filter/filter_function.go b/pkg/util/filter/filter_function.go new file mode 100644 index 0000000000000..58ba74fe8b7e0 --- /dev/null +++ b/pkg/util/filter/filter_function.go @@ -0,0 +1,3 @@ +package filter + +type Func func(string) bool From 0f6265639a72f2d7dfdabeea163ee66b34453183 Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 12 Apr 2022 09:37:49 +0100 Subject: [PATCH 02/19] Fix linting issues Signed-off-by: Michel Hollands --- pkg/storage/chunk/bigchunk.go | 3 ++- pkg/storage/chunk/interface.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/storage/chunk/bigchunk.go b/pkg/storage/chunk/bigchunk.go index bda0aeca6daef..f131a23f7f803 100644 --- a/pkg/storage/chunk/bigchunk.go +++ b/pkg/storage/chunk/bigchunk.go @@ -6,9 +6,10 @@ import ( "errors" "io" - "github.com/grafana/loki/pkg/util/filter" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/tsdb/chunkenc" + + "github.com/grafana/loki/pkg/util/filter" ) const samplesPerChunk = 120 diff --git a/pkg/storage/chunk/interface.go b/pkg/storage/chunk/interface.go index 541383d225fb9..159a8a788ec53 100644 --- a/pkg/storage/chunk/interface.go +++ b/pkg/storage/chunk/interface.go @@ -20,10 +20,11 @@ import ( "context" "io" - "github.com/grafana/loki/pkg/util/filter" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" errs "github.com/weaveworks/common/errors" + + "github.com/grafana/loki/pkg/util/filter" ) const ( From 5c370d72aba7cbfbe920e5be61b8dd24197ba347 Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 12 Apr 2022 11:30:27 +0100 Subject: [PATCH 03/19] Add filter function to delete request Signed-off-by: Michel Hollands --- .../stores/shipper/compactor/compactor.go | 2 +- .../compactor/deletion/delete_request.go | 64 +++++++-- .../compactor/deletion/delete_request_test.go | 51 ++++--- .../deletion/delete_requests_manager.go | 28 ++-- .../deletion/delete_requests_manager_test.go | 30 ++-- .../shipper/compactor/deletion/validation.go | 12 +- .../compactor/deletion/validation_test.go | 24 ++-- .../shipper/compactor/retention/expiration.go | 15 +- .../compactor/retention/expiration_test.go | 4 +- .../shipper/compactor/retention/retention.go | 25 ++-- .../compactor/retention/retention_test.go | 131 ++++++++++++------ 11 files changed, 255 insertions(+), 131 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index fdffe742838f7..d58581678af00 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -567,7 +567,7 @@ func newExpirationChecker(retentionExpiryChecker, deletionExpiryChecker retentio return &expirationChecker{retentionExpiryChecker, deletionExpiryChecker} } -func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []model.Interval) { +func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []retention.IntervalFilter) { if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(ref, now); expired { return expired, nonDeletedIntervals } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go index 27aa24ddd256d..66794837e284a 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go @@ -4,7 +4,9 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/util/filter" ) type DeleteRequest struct { @@ -15,21 +17,48 @@ type DeleteRequest struct { Status DeleteRequestStatus `json:"status"` CreatedAt model.Time `json:"created_at"` - UserID string `json:"-"` - matchers []*labels.Matcher `json:"-"` + UserID string `json:"-"` + matchers []*labels.Matcher `json:"-"` + logSelectorExpr syntax.LogSelectorExpr `json:"-"` } func (d *DeleteRequest) SetQuery(logQL string) error { d.Query = logQL - matchers, err := parseDeletionQuery(logQL) + logSelectorExpr, err := parseDeletionQuery(logQL) if err != nil { return err } - d.matchers = matchers + d.logSelectorExpr = logSelectorExpr + d.matchers = logSelectorExpr.Matchers() return nil } -func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Interval) { +// FilterFunction returns a filter function that returns true if the given line matches +func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error) { + if d.logSelectorExpr == nil { + err := d.SetQuery(d.Query) + if err != nil { + return nil, err + } + } + p, err := d.logSelectorExpr.Pipeline() + if err != nil { + return nil, err + } + + f := p.ForStream(labels).ProcessString + return func(s string) bool { + result, _, skip := f(s) + if len(result) != 0 || skip { + return true + } + return false + }, nil +} + +// IsDeleted checks if the given ChunkEntry will be deleted by this DeleteRequest. +// It also returns the intervals of the ChunkEntry that will remain. +func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []retention.IntervalFilter) { if d.UserID != unsafeGetString(entry.UserID) { return false, nil } @@ -52,19 +81,30 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Int return true, nil } - intervals := make([]model.Interval, 0, 2) + intervals := make([]retention.IntervalFilter, 0, 2) + ff, err := d.FilterFunction(entry.Labels) + if err != nil { + // TODO: log this? Handle it different? + return false, nil + } if d.StartTime > entry.From { - intervals = append(intervals, model.Interval{ - Start: entry.From, - End: d.StartTime - 1, + intervals = append(intervals, retention.IntervalFilter{ + Interval: model.Interval{ + Start: entry.From, + End: d.StartTime - 1, + }, + Filter: ff, }) } if d.EndTime < entry.Through { - intervals = append(intervals, model.Interval{ - Start: d.EndTime + 1, - End: entry.Through, + intervals = append(intervals, retention.IntervalFilter{ + Interval: model.Interval{ + Start: d.EndTime + 1, + End: entry.Through, + }, + Filter: ff, }) } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go index ec7232954d916..7db4267e49842 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go @@ -29,7 +29,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { type resp struct { isDeleted bool - nonDeletedIntervals []model.Interval + nonDeletedIntervals []retention.IntervalFilter } for _, tc := range []struct { @@ -60,10 +60,12 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { }, expectedResp: resp{ isDeleted: true, - nonDeletedIntervals: []model.Interval{ + nonDeletedIntervals: []retention.IntervalFilter{ { - Start: now.Add(-2*time.Hour) + 1, - End: now.Add(-time.Hour), + Interval: model.Interval{ + Start: now.Add(-2*time.Hour) + 1, + End: now.Add(-time.Hour), + }, }, }, }, @@ -78,10 +80,12 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { }, expectedResp: resp{ isDeleted: true, - nonDeletedIntervals: []model.Interval{ + nonDeletedIntervals: []retention.IntervalFilter{ { - Start: now.Add(-3 * time.Hour), - End: now.Add(-2*time.Hour) - 1, + Interval: model.Interval{ + Start: now.Add(-3 * time.Hour), + End: now.Add(-2*time.Hour) - 1, + }, }, }, }, @@ -96,10 +100,12 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { }, expectedResp: resp{ isDeleted: true, - nonDeletedIntervals: []model.Interval{ + nonDeletedIntervals: []retention.IntervalFilter{ { - Start: now.Add(-3 * time.Hour), - End: now.Add(-2*time.Hour) - 1, + Interval: model.Interval{ + Start: now.Add(-3 * time.Hour), + End: now.Add(-2*time.Hour) - 1, + }, }, }, }, @@ -114,14 +120,18 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { }, expectedResp: resp{ isDeleted: true, - nonDeletedIntervals: []model.Interval{ + nonDeletedIntervals: []retention.IntervalFilter{ { - Start: now.Add(-3 * time.Hour), - End: now.Add(-(2*time.Hour + 30*time.Minute)) - 1, + Interval: model.Interval{ + Start: now.Add(-3 * time.Hour), + End: now.Add(-(2*time.Hour + 30*time.Minute)) - 1, + }, }, { - Start: now.Add(-(time.Hour + 30*time.Minute)) + 1, - End: now.Add(-time.Hour), + Interval: model.Interval{ + Start: now.Add(-(time.Hour + 30*time.Minute)) + 1, + End: now.Add(-time.Hour), + }, }, }, }, @@ -167,7 +177,16 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query)) isDeleted, nonDeletedIntervals := tc.deleteRequest.IsDeleted(chunkEntry) require.Equal(t, tc.expectedResp.isDeleted, isDeleted) - require.Equal(t, tc.expectedResp.nonDeletedIntervals, nonDeletedIntervals) + for idx := range tc.expectedResp.nonDeletedIntervals { + require.Equal(t, + tc.expectedResp.nonDeletedIntervals[idx].Interval.Start, + nonDeletedIntervals[idx].Interval.Start, + ) + require.Equal(t, + tc.expectedResp.nonDeletedIntervals[idx].Interval.End, + nonDeletedIntervals[idx].Interval.End, + ) + } }) } } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go index 76a73124b2db8..057a020fdf73f 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go @@ -24,21 +24,23 @@ type DeleteRequestsManager struct { deleteRequestCancelPeriod time.Duration deleteRequestsToProcess []DeleteRequest - chunkIntervalsToRetain []model.Interval + chunkIntervalsToRetain []retention.IntervalFilter // WARN: If by any chance we change deleteRequestsToProcessMtx to sync.RWMutex to be able to check multiple chunks at a time, // please take care of chunkIntervalsToRetain which should be unique per chunk. deleteRequestsToProcessMtx sync.Mutex metrics *deleteRequestsManagerMetrics wg sync.WaitGroup done chan struct{} + deletionMode Mode } -func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestsManager { +func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer, mode Mode) *DeleteRequestsManager { dm := &DeleteRequestsManager{ deleteRequestsStore: store, deleteRequestCancelPeriod: deleteRequestCancelPeriod, metrics: newDeleteRequestsManagerMetrics(registerer), done: make(chan struct{}), + deletionMode: mode, } go dm.loop() @@ -123,7 +125,7 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { return nil } -func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) (bool, []model.Interval) { +func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) (bool, []retention.IntervalFilter) { d.deleteRequestsToProcessMtx.Lock() defer d.deleteRequestsToProcessMtx.Unlock() @@ -132,20 +134,22 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) } d.chunkIntervalsToRetain = d.chunkIntervalsToRetain[:0] - d.chunkIntervalsToRetain = append(d.chunkIntervalsToRetain, model.Interval{ - Start: ref.From, - End: ref.Through, + d.chunkIntervalsToRetain = append(d.chunkIntervalsToRetain, retention.IntervalFilter{ + Interval: model.Interval{ + Start: ref.From, + End: ref.Through, + }, }) for _, deleteRequest := range d.deleteRequestsToProcess { - rebuiltIntervals := make([]model.Interval, 0, len(d.chunkIntervalsToRetain)) - for _, interval := range d.chunkIntervalsToRetain { + rebuiltIntervals := make([]retention.IntervalFilter, 0, len(d.chunkIntervalsToRetain)) + for _, ivf := range d.chunkIntervalsToRetain { entry := ref - entry.From = interval.Start - entry.Through = interval.End + entry.From = ivf.Interval.Start + entry.Through = ivf.Interval.End isDeleted, newIntervalsToRetain := deleteRequest.IsDeleted(entry) if !isDeleted { - rebuiltIntervals = append(rebuiltIntervals, interval) + rebuiltIntervals = append(rebuiltIntervals, ivf) } else { rebuiltIntervals = append(rebuiltIntervals, newIntervalsToRetain...) } @@ -158,7 +162,7 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) } } - if len(d.chunkIntervalsToRetain) == 1 && d.chunkIntervalsToRetain[0].Start == ref.From && d.chunkIntervalsToRetain[0].End == ref.Through { + if len(d.chunkIntervalsToRetain) == 1 && d.chunkIntervalsToRetain[0].Interval.Start == ref.From && d.chunkIntervalsToRetain[0].Interval.End == ref.Through { return false, nil } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go index 9e3f314580e20..6f5f95caf6108 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go @@ -17,7 +17,7 @@ const testUserID = "test-user" func TestDeleteRequestsManager_Expired(t *testing.T) { type resp struct { isExpired bool - nonDeletedIntervals []model.Interval + nonDeletedIntervals []retention.IntervalFilter } now := model.Now() @@ -141,18 +141,24 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, expectedResp: resp{ isExpired: true, - nonDeletedIntervals: []model.Interval{ + nonDeletedIntervals: []retention.IntervalFilter{ { - Start: now.Add(-11*time.Hour) + 1, - End: now.Add(-10*time.Hour) - 1, + Interval: model.Interval{ + Start: now.Add(-11*time.Hour) + 1, + End: now.Add(-10*time.Hour) - 1, + }, }, { - Start: now.Add(-8*time.Hour) + 1, - End: now.Add(-6*time.Hour) - 1, + Interval: model.Interval{ + Start: now.Add(-8*time.Hour) + 1, + End: now.Add(-6*time.Hour) - 1, + }, }, { - Start: now.Add(-5*time.Hour) + 1, - End: now.Add(-2*time.Hour) - 1, + Interval: model.Interval{ + Start: now.Add(-5*time.Hour) + 1, + End: now.Add(-2*time.Hour) - 1, + }, }, }, }, @@ -207,12 +213,16 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - mgr := NewDeleteRequestsManager(mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, nil) + mgr := NewDeleteRequestsManager(mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, nil, WholeStreamDeletion) require.NoError(t, mgr.loadDeleteRequestsToProcess()) isExpired, nonDeletedIntervals := mgr.Expired(chunkEntry, model.Now()) require.Equal(t, tc.expectedResp.isExpired, isExpired) - require.Equal(t, tc.expectedResp.nonDeletedIntervals, nonDeletedIntervals) + for idx, interval := range nonDeletedIntervals { + require.Equal(t, tc.expectedResp.nonDeletedIntervals[idx].Interval.Start, interval.Interval.Start) + require.Equal(t, tc.expectedResp.nonDeletedIntervals[idx].Interval.End, interval.Interval.End) + require.NotNil(t, interval.Filter) + } }) } } diff --git a/pkg/storage/stores/shipper/compactor/deletion/validation.go b/pkg/storage/stores/shipper/compactor/deletion/validation.go index 1aacfbfbb50d8..c003ae3414f91 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/validation.go +++ b/pkg/storage/stores/shipper/compactor/deletion/validation.go @@ -3,8 +3,6 @@ package deletion import ( "errors" - "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/loki/pkg/logql/syntax" ) @@ -14,15 +12,11 @@ var ( ) // parseDeletionQuery checks if the given logQL is valid for deletions -func parseDeletionQuery(query string) ([]*labels.Matcher, error) { - expr, err := syntax.ParseExpr(query) +func parseDeletionQuery(query string) (syntax.LogSelectorExpr, error) { + logSelectorExpr, err := syntax.ParseLogSelector(query, false) if err != nil { return nil, errInvalidQuery } - if matchersExpr, ok := expr.(*syntax.MatchersExpr); ok { - return matchersExpr.Matchers(), nil - } - - return nil, errUnsupportedQuery + return logSelectorExpr, nil } diff --git a/pkg/storage/stores/shipper/compactor/deletion/validation_test.go b/pkg/storage/stores/shipper/compactor/deletion/validation_test.go index 8a97fb72e47ea..890b49de90a12 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/validation_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/validation_test.go @@ -8,32 +8,34 @@ import ( func TestParseLogQLExpressionForDeletion(t *testing.T) { t.Run("invalid logql", func(t *testing.T) { - matchers, err := parseDeletionQuery("gjgjg ggj") - require.Nil(t, matchers) + logSelectorExpr, err := parseDeletionQuery("gjgjg ggj") + require.Nil(t, logSelectorExpr) require.ErrorIs(t, err, errInvalidQuery) }) t.Run("matcher expression", func(t *testing.T) { - matchers, err := parseDeletionQuery(`{env="dev", secret="true"}`) - require.NotNil(t, matchers) + logSelectorExpr, err := parseDeletionQuery(`{env="dev", secret="true"}`) + require.NotNil(t, logSelectorExpr) require.NoError(t, err) }) t.Run("pipeline expression with line filter", func(t *testing.T) { - matchers, err := parseDeletionQuery(`{env="dev", secret="true"} |= "social sec number"`) - require.Nil(t, matchers) - require.ErrorIs(t, err, errUnsupportedQuery) + logSelectorExpr, err := parseDeletionQuery(`{env="dev", secret="true"} |= "social sec number"`) + require.NotNil(t, logSelectorExpr) + require.NoError(t, err) }) + /* syntax.ParseLogSelector does not reject these t.Run("pipeline expression with label filter ", func(t *testing.T) { - matchers, err := parseDeletionQuery(`{env="dev", secret="true"} | json bob="top.params[0]"`) - require.Nil(t, matchers) + logSelectorExpr, err := parseDeletionQuery(`{env="dev", secret="true"} | json bob="top.params[0]"`) + require.Nil(t, logSelectorExpr) require.ErrorIs(t, err, errUnsupportedQuery) }) t.Run("metrics query", func(t *testing.T) { - matchers, err := parseDeletionQuery(`count_over_time({job="mysql"}[5m])`) - require.Nil(t, matchers) + logSelectorExpr, err := parseDeletionQuery(`count_over_time({job="mysql"}[5m])`) + require.Nil(t, logSelectorExpr) require.ErrorIs(t, err, errUnsupportedQuery) }) + */ } diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/shipper/compactor/retention/expiration.go index e9aee2744027c..8f9617b03741b 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration.go @@ -8,12 +8,21 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/util/filter" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/validation" ) +// IntervalFilter contains the interval to delete +// and the function that filters lines. These will be +// applied to a chunk. +type IntervalFilter struct { + Interval model.Interval + Filter filter.Func +} + type ExpirationChecker interface { - Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) + Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool MarkPhaseStarted() MarkPhaseFailed() @@ -40,7 +49,7 @@ func NewExpirationChecker(limits Limits) ExpirationChecker { } // Expired tells if a ref chunk is expired based on retention rules. -func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) { +func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) { userID := unsafeGetString(ref.UserID) period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels) return now.Sub(ref.Through) > period, nil @@ -88,7 +97,7 @@ func NeverExpiringExpirationChecker(limits Limits) ExpirationChecker { type neverExpiringExpirationChecker struct{} -func (e *neverExpiringExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) { +func (e *neverExpiringExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) { return false, nil } func (e *neverExpiringExpirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool { diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go index e51a93cbe0ade..d7945ae924b1b 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go @@ -81,9 +81,9 @@ func Test_expirationChecker_Expired(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actual, nonDeletedIntervals := e.Expired(tt.ref, model.Now()) + actual, nonDeletedIntervalFilters := e.Expired(tt.ref, model.Now()) require.Equal(t, tt.want, actual) - require.Nil(t, nonDeletedIntervals) + require.Nil(t, nonDeletedIntervalFilters) }) } } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index a74901c0cf8a2..b74d05748f058 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -151,11 +151,11 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr seriesMap.Add(c.SeriesID, c.UserID, c.Labels) // see if the chunk is deleted completely or partially - if expired, nonDeletedIntervals := expiration.Expired(c, now); expired { - if len(nonDeletedIntervals) > 0 { - wroteChunks, err := chunkRewriter.rewriteChunk(ctx, c, nonDeletedIntervals) + if expired, nonDeletedIntervalFilters := expiration.Expired(c, now); expired { + if len(nonDeletedIntervalFilters) > 0 { + wroteChunks, err := chunkRewriter.rewriteChunk(ctx, c, nonDeletedIntervalFilters) if err != nil { - return false, false, fmt.Errorf("failed to rewrite chunk %s for interval %s with error %s", c.ChunkID, nonDeletedIntervals, err) + return false, false, fmt.Errorf("failed to rewrite chunk %s for intervals %+v with error %s", c.ChunkID, nonDeletedIntervalFilters, err) } if wroteChunks { @@ -173,7 +173,7 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr // Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in. // For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then // the retention would fail because it would fail to find it in the storage. - if len(nonDeletedIntervals) == 0 || c.Through <= tableInterval.End { + if len(nonDeletedIntervalFilters) == 0 || c.Through <= tableInterval.End { if err := marker.Put(c.ChunkID); err != nil { return false, false, err } @@ -307,7 +307,7 @@ func newChunkRewriter(chunkClient client.Client, schemaCfg config.PeriodConfig, }, nil } -func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, intervals []model.Interval) (bool, error) { +func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, intervalFilters []IntervalFilter) (bool, error) { userID := unsafeGetString(ce.UserID) chunkID := unsafeGetString(ce.ChunkID) @@ -327,8 +327,11 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, interva wroteChunks := false - for _, interval := range intervals { - newChunkData, err := chks[0].Data.Rebound(interval.Start, interval.End, nil) + for _, ivf := range intervalFilters { + start := ivf.Interval.Start + end := ivf.Interval.End + + newChunkData, err := chks[0].Data.Rebound(start, end, ivf.Filter) if err != nil { return false, err } @@ -341,8 +344,8 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, interva newChunk := chunk.NewChunk( userID, chks[0].FingerprintModel(), chks[0].Metric, facade, - interval.Start, - interval.End, + start, + end, ) err = newChunk.Encode() @@ -350,7 +353,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, interva return false, err } - entries, err := c.seriesStoreSchema.GetChunkWriteEntries(interval.Start, interval.End, userID, "logs", newChunk.Metric, c.scfg.ExternalKey(newChunk.ChunkRef)) + entries, err := c.seriesStoreSchema.GetChunkWriteEntries(start, end, userID, "logs", newChunk.Metric, c.scfg.ExternalKey(newChunk.ChunkRef)) if err != nil { return false, err } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index d973fc0a5b6db..c944afae57e9f 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -313,9 +313,9 @@ func TestChunkRewriter(t *testing.T) { minListMarkDelay = 1 * time.Second now := model.Now() for _, tt := range []struct { - name string - chunk chunk.Chunk - rewriteIntervals []model.Interval + name string + chunk chunk.Chunk + rewriteIntervalFilters []IntervalFilter }{ { name: "no rewrites", @@ -328,56 +328,91 @@ func TestChunkRewriter(t *testing.T) { { name: "rewrite first half", chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-2*time.Hour), now), - rewriteIntervals: []model.Interval{ + rewriteIntervalFilters: []IntervalFilter{ { - Start: now.Add(-2 * time.Hour), - End: now.Add(-1 * time.Hour), + Interval: model.Interval{ + Start: now.Add(-2 * time.Hour), + End: now.Add(-1 * time.Hour), + }, }, }, }, { name: "rewrite second half", chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-2*time.Hour), now), - rewriteIntervals: []model.Interval{ + rewriteIntervalFilters: []IntervalFilter{ { - Start: now.Add(-time.Hour), - End: now, + Interval: model.Interval{ + Start: now.Add(-time.Hour), + End: now, + }, }, }, }, { name: "rewrite multiple intervals", chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-12*time.Hour), now), - rewriteIntervals: []model.Interval{ + rewriteIntervalFilters: []IntervalFilter{ { - Start: now.Add(-12 * time.Hour), - End: now.Add(-10 * time.Hour), + Interval: model.Interval{ + Start: now.Add(-12 * time.Hour), + End: now.Add(-10 * time.Hour), + }, }, { - Start: now.Add(-9 * time.Hour), - End: now.Add(-5 * time.Hour), + Interval: model.Interval{ + Start: now.Add(-9 * time.Hour), + End: now.Add(-5 * time.Hour), + }, }, { - Start: now.Add(-2 * time.Hour), - End: now, + Interval: model.Interval{ + Start: now.Add(-2 * time.Hour), + End: now, + }, }, }, }, { name: "rewrite chunk spanning multiple days with multiple intervals", chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-72*time.Hour), now), - rewriteIntervals: []model.Interval{ + rewriteIntervalFilters: []IntervalFilter{ { - Start: now.Add(-71 * time.Hour), - End: now.Add(-47 * time.Hour), + Interval: model.Interval{ + Start: now.Add(-71 * time.Hour), + End: now.Add(-47 * time.Hour), + }, + }, + { + Interval: model.Interval{ + Start: now.Add(-40 * time.Hour), + End: now.Add(-30 * time.Hour), + }, }, { - Start: now.Add(-40 * time.Hour), - End: now.Add(-30 * time.Hour), + Interval: model.Interval{ + Start: now.Add(-2 * time.Hour), + End: now, + }, }, + }, + }, + { + name: "remove half of the lines using a filter function", + chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-2*time.Hour), now), + rewriteIntervalFilters: []IntervalFilter{ { - Start: now.Add(-2 * time.Hour), - End: now, + Interval: model.Interval{ + Start: now.Add(-1 * time.Hour), + End: now, + }, + Filter: func(s string) bool { + i, err := strconv.Atoi(s[len(s)-1:]) + if err != nil { + panic(fmt.Sprintf("invalid input to filter function: %v", s)) + } + return i%2 == 0 + }, }, }, }, @@ -401,9 +436,9 @@ func TestChunkRewriter(t *testing.T) { cr, err := newChunkRewriter(chunkClient, store.schemaCfg.Configs[0], indexTable.name, bucket) require.NoError(t, err) - wroteChunks, err := cr.rewriteChunk(context.Background(), entryFromChunk(store.schemaCfg, tt.chunk), tt.rewriteIntervals) + wroteChunks, err := cr.rewriteChunk(context.Background(), entryFromChunk(store.schemaCfg, tt.chunk), tt.rewriteIntervalFilters) require.NoError(t, err) - if len(tt.rewriteIntervals) == 0 { + if len(tt.rewriteIntervalFilters) == 0 { require.False(t, wroteChunks) } return nil @@ -416,9 +451,9 @@ func TestChunkRewriter(t *testing.T) { chunks := store.GetChunks(tt.chunk.UserID, tt.chunk.From, tt.chunk.Through, tt.chunk.Metric) // number of chunks should be the new re-written chunks + the source chunk - require.Len(t, chunks, len(tt.rewriteIntervals)+1) - for _, interval := range tt.rewriteIntervals { - expectedChk := createChunk(t, tt.chunk.UserID, labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, interval.Start, interval.End) + require.Len(t, chunks, len(tt.rewriteIntervalFilters)+1) + for _, ivf := range tt.rewriteIntervalFilters { + expectedChk := createChunk(t, tt.chunk.UserID, labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, ivf.Interval.Start, ivf.Interval.End) for i, chk := range chunks { if store.schemaCfg.ExternalKey(chk.ChunkRef) == store.schemaCfg.ExternalKey(expectedChk.ChunkRef) { chunks = append(chunks[:i], chunks[i+1:]...) @@ -450,8 +485,8 @@ func (s *seriesCleanedRecorder) Cleanup(userID []byte, lbls labels.Labels) error } type chunkExpiry struct { - isExpired bool - nonDeletedIntervals []model.Interval + isExpired bool + nonDeletedIntervalFilters []IntervalFilter } type mockExpirationChecker struct { @@ -463,9 +498,9 @@ func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) mockExpiratio return mockExpirationChecker{chunksExpiry: chunksExpiry} } -func (m mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) { +func (m mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) { ce := m.chunksExpiry[string(ref.ChunkID)] - return ce.isExpired, ce.nonDeletedIntervals + return ce.isExpired, ce.nonDeletedIntervalFilters } func (m mockExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool { @@ -534,9 +569,11 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expiry: []chunkExpiry{ { isExpired: true, - nonDeletedIntervals: []model.Interval{{ - Start: todaysTableInterval.Start, - End: todaysTableInterval.Start.Add(15 * time.Minute), + nonDeletedIntervalFilters: []IntervalFilter{{ + Interval: model.Interval{ + Start: todaysTableInterval.Start, + End: todaysTableInterval.Start.Add(15 * time.Minute), + }, }}, }, }, @@ -586,9 +623,11 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { }, { isExpired: true, - nonDeletedIntervals: []model.Interval{{ - Start: todaysTableInterval.Start, - End: todaysTableInterval.Start.Add(15 * time.Minute), + nonDeletedIntervalFilters: []IntervalFilter{{ + Interval: model.Interval{ + Start: todaysTableInterval.Start, + End: todaysTableInterval.Start.Add(15 * time.Minute), + }, }}, }, }, @@ -610,9 +649,11 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expiry: []chunkExpiry{ { isExpired: true, - nonDeletedIntervals: []model.Interval{{ - Start: todaysTableInterval.Start, - End: now, + nonDeletedIntervalFilters: []IntervalFilter{{ + Interval: model.Interval{ + Start: todaysTableInterval.Start, + End: now, + }, }}, }, }, @@ -634,9 +675,11 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expiry: []chunkExpiry{ { isExpired: true, - nonDeletedIntervals: []model.Interval{{ - Start: todaysTableInterval.Start.Add(-30 * time.Minute), - End: now, + nonDeletedIntervalFilters: []IntervalFilter{{ + Interval: model.Interval{ + Start: todaysTableInterval.Start.Add(-30 * time.Minute), + End: now, + }, }}, }, }, From 5906acc4b565d34632ed0c2b1e112af2fcba70af Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 12 Apr 2022 12:23:43 +0100 Subject: [PATCH 04/19] Fix linting issues Signed-off-by: Michel Hollands --- pkg/storage/stores/shipper/compactor/compactor.go | 7 ++++++- .../stores/shipper/compactor/deletion/validation.go | 3 +-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index d58581678af00..aa6d536f6977d 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -237,7 +237,12 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem return err } c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r) - c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r) + c.deleteRequestsManager = deletion.NewDeleteRequestsManager( + c.deleteRequestsStore, + c.cfg.DeleteRequestCancelPeriod, + r, + c.deleteMode, + ) c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager) } else { c.expirationChecker = newExpirationChecker( diff --git a/pkg/storage/stores/shipper/compactor/deletion/validation.go b/pkg/storage/stores/shipper/compactor/deletion/validation.go index c003ae3414f91..68032e115fa03 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/validation.go +++ b/pkg/storage/stores/shipper/compactor/deletion/validation.go @@ -7,8 +7,7 @@ import ( ) var ( - errInvalidQuery = errors.New("invalid query expression") - errUnsupportedQuery = errors.New("unsupported query expression") + errInvalidQuery = errors.New("invalid query expression") ) // parseDeletionQuery checks if the given logQL is valid for deletions From f4b8060e813dade2b660a93f75c6d623106b0e4f Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Thu, 14 Apr 2022 12:37:01 +0100 Subject: [PATCH 05/19] Enable api for filter and delete mode Signed-off-by: Michel Hollands --- pkg/loki/modules.go | 3 ++- pkg/storage/stores/shipper/compactor/compactor.go | 2 +- pkg/storage/stores/shipper/compactor/retention/retention.go | 5 +++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 389d97de954ed..81d7d1345d616 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -795,7 +795,8 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor) - if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() != deletion.Disabled { + deleteMode := t.compactor.DeleteMode() + if t.Cfg.CompactorConfig.RetentionEnabled && (deleteMode == deletion.WholeStreamDeletion || deleteMode == deletion.FilterAndDelete) { t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index aa6d536f6977d..9275fdf3746b1 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -229,7 +229,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem return err } - if c.deleteMode != deletion.Disabled { + if c.deleteMode == deletion.WholeStreamDeletion || c.deleteMode == deletion.FilterOnly || c.deleteMode == deletion.FilterAndDelete { deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index b74d05748f058..5415924690435 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -333,6 +333,11 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, interva newChunkData, err := chks[0].Data.Rebound(start, end, ivf.Filter) if err != nil { + if errors.Is(err, chunk.ErrSliceNoDataInRange) { + level.Info(util_log.Logger).Log("msg", "Rebound leaves an empty chunk", "chunk ref", string(ce.ChunkRef.ChunkID)) + // skip empty chunks + continue + } return false, err } From c460772ebf69ed23c062e7420fd402626959316f Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Thu, 14 Apr 2022 12:37:49 +0100 Subject: [PATCH 06/19] Add settings for retention Signed-off-by: Michel Hollands --- cmd/loki/loki-docker-config.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cmd/loki/loki-docker-config.yaml b/cmd/loki/loki-docker-config.yaml index 6d341a3ea3239..cec3a0327b7f1 100644 --- a/cmd/loki/loki-docker-config.yaml +++ b/cmd/loki/loki-docker-config.yaml @@ -39,3 +39,14 @@ ruler: # If you would like to disable reporting, uncomment the following lines: #analytics: # reporting_enabled: false + +ingester: + max_chunk_age: 10m + +compactor: + working_directory: /loki/retention + compaction_interval: 10m + retention_enabled: true + deletion_mode: "filter-and-delete" + retention_delete_delay: 20m + delete_request_cancel_period: 20m From 3f8144e59e288c2d97b02157dd075fcd6def0265 Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Thu, 14 Apr 2022 15:48:28 +0100 Subject: [PATCH 07/19] Use labels to check and add test Signed-off-by: Michel Hollands --- .../compactor/deletion/delete_request.go | 12 +++++++++ .../compactor/deletion/delete_request_test.go | 26 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go index 66794837e284a..4463d5d333778 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go @@ -48,6 +48,9 @@ func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error f := p.ForStream(labels).ProcessString return func(s string) bool { + if !allMatch(d.matchers, labels) { + return false + } result, _, skip := f(s) if len(result) != 0 || skip { return true @@ -56,6 +59,15 @@ func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error }, nil } +func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool { + for _, m := range matchers { + if !m.Matches(labels.Get(m.Name)) { + return false + } + } + return true +} + // IsDeleted checks if the given ChunkEntry will be deleted by this DeleteRequest. // It also returns the intervals of the ChunkEntry that will remain. func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []retention.IntervalFilter) { diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go index 7db4267e49842..1df2c426ea197 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go @@ -199,3 +199,29 @@ func mustParseLabel(input string) labels.Labels { return lbls } + +func TestDeleteRequest_FilterFunction(t *testing.T) { + dr := DeleteRequest{ + Query: `{foo="bar"} |= "some"`, + } + + lblStr := `{foo="bar"}` + lbls := mustParseLabel(lblStr) + + f, err := dr.FilterFunction(lbls) + require.NoError(t, err) + + require.True(t, f(`some line`)) + require.False(t, f("")) + require.False(t, f("other line")) + + lblStr = `{foo2="buzz"}` + lbls = mustParseLabel(lblStr) + + f, err = dr.FilterFunction(lbls) + require.NoError(t, err) + + require.False(t, f("")) + require.False(t, f("other line")) + require.False(t, f("some line")) +} From 3965473f35811d0c55b2d28b78532364f02a34d0 Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 19 Apr 2022 09:57:00 +0100 Subject: [PATCH 08/19] Simplify filter function Signed-off-by: Michel Hollands --- .../shipper/compactor/deletion/delete_request.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go index 4463d5d333778..db9cb20867bb6 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go @@ -46,16 +46,16 @@ func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error return nil, err } + if !allMatch(d.matchers, labels) { + return func(s string) bool { + return false + }, nil + } + f := p.ForStream(labels).ProcessString return func(s string) bool { - if !allMatch(d.matchers, labels) { - return false - } result, _, skip := f(s) - if len(result) != 0 || skip { - return true - } - return false + return len(result) != 0 || skip }, nil } From 810363ee98800e10aa91fe9cba60ced9846631a7 Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 19 Apr 2022 10:32:28 +0100 Subject: [PATCH 09/19] Also enable filter mode Signed-off-by: Michel Hollands --- pkg/loki/modules.go | 16 +++++++++------- .../stores/shipper/compactor/compactor.go | 5 +++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 81d7d1345d616..9d36d1e96a5b6 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -795,13 +795,15 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor) - deleteMode := t.compactor.DeleteMode() - if t.Cfg.CompactorConfig.RetentionEnabled && (deleteMode == deletion.WholeStreamDeletion || deleteMode == deletion.FilterAndDelete) { - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))) - - t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler))) + if t.Cfg.CompactorConfig.RetentionEnabled { + switch t.compactor.DeleteMode() { + case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete: + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))) + default: + break + } } return t.compactor, nil diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 9275fdf3746b1..a6e250bae7174 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -229,7 +229,8 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem return err } - if c.deleteMode == deletion.WholeStreamDeletion || c.deleteMode == deletion.FilterOnly || c.deleteMode == deletion.FilterAndDelete { + switch c.deleteMode { + case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete: deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient) @@ -244,7 +245,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem c.deleteMode, ) c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager) - } else { + default: c.expirationChecker = newExpirationChecker( retention.NewExpirationChecker(limits), // This is a dummy deletion ExpirationChecker that never expires anything From 3b9dea1b48ca85c110d8ae541f8bb221aff2adf0 Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 19 Apr 2022 10:33:36 +0100 Subject: [PATCH 10/19] Remove test settings in config file for docker Signed-off-by: Michel Hollands --- cmd/loki/loki-docker-config.yaml | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/cmd/loki/loki-docker-config.yaml b/cmd/loki/loki-docker-config.yaml index cec3a0327b7f1..6d341a3ea3239 100644 --- a/cmd/loki/loki-docker-config.yaml +++ b/cmd/loki/loki-docker-config.yaml @@ -39,14 +39,3 @@ ruler: # If you would like to disable reporting, uncomment the following lines: #analytics: # reporting_enabled: false - -ingester: - max_chunk_age: 10m - -compactor: - working_directory: /loki/retention - compaction_interval: 10m - retention_enabled: true - deletion_mode: "filter-and-delete" - retention_delete_delay: 20m - delete_request_cancel_period: 20m From 94ab3847e83b4cb1b88830dc1ac236331a0630fc Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 19 Apr 2022 10:53:07 +0100 Subject: [PATCH 11/19] Add extra (unused) param for ProcessString in filter Signed-off-by: Michel Hollands --- pkg/storage/stores/shipper/compactor/deletion/delete_request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go index db9cb20867bb6..b096128838de5 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go @@ -54,7 +54,7 @@ func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error f := p.ForStream(labels).ProcessString return func(s string) bool { - result, _, skip := f(s) + result, _, skip := f(0, s) return len(result) != 0 || skip }, nil } From 668edba4050d2f052348d9308364a4e9a13cb130 Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 19 Apr 2022 11:06:10 +0100 Subject: [PATCH 12/19] Empty commit to trigger CI again Signed-off-by: Michel Hollands From f4704f68386613bb01f30356ffaee18a0437c2f1 Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 19 Apr 2022 11:21:48 +0100 Subject: [PATCH 13/19] Update changelog Signed-off-by: Michel Hollands --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ddf5804c904b..58ec6980ebbb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main * [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters. +* [5879](https://github.com/grafana/loki/pull/5879) **MichelHollands**: Remove lines matching delete request expression when using "filter-and-delete" deletion mode. * [5899](https://github.com/grafana/loki/pull/5899) **simonswine**: Update go image to 1.17.9. * [5888](https://github.com/grafana/loki/pull/5888) **Papawy** Fix common config net interface name overwritten by ring common config * [5799](https://github.com/grafana/loki/pull/5799) **cyriltovena** Fix deduping issues when multiple entries with the same timestamp exist. From e3bc524e33c88b81a7900aa798ca63d300ca4a4e Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 19 Apr 2022 14:18:34 +0100 Subject: [PATCH 14/19] Fix flapping test Signed-off-by: Michel Hollands --- .../stores/shipper/compactor/retention/retention_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index c944afae57e9f..62ff331ee22b9 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -398,7 +398,7 @@ func TestChunkRewriter(t *testing.T) { }, }, { - name: "remove half of the lines using a filter function", + name: "remove no lines using a filter function", chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-2*time.Hour), now), rewriteIntervalFilters: []IntervalFilter{ { @@ -407,11 +407,7 @@ func TestChunkRewriter(t *testing.T) { End: now, }, Filter: func(s string) bool { - i, err := strconv.Atoi(s[len(s)-1:]) - if err != nil { - panic(fmt.Sprintf("invalid input to filter function: %v", s)) - } - return i%2 == 0 + return false }, }, }, From 48166fbca66656609be16a338eb0f9eb37e396ac Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 19 Apr 2022 16:04:44 +0100 Subject: [PATCH 15/19] Remove commented out unit tests and add some more Signed-off-by: Michel Hollands --- .../compactor/deletion/validation_test.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/deletion/validation_test.go b/pkg/storage/stores/shipper/compactor/deletion/validation_test.go index 890b49de90a12..fa5486c0c830b 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/validation_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/validation_test.go @@ -25,17 +25,15 @@ func TestParseLogQLExpressionForDeletion(t *testing.T) { require.NoError(t, err) }) - /* syntax.ParseLogSelector does not reject these - t.Run("pipeline expression with label filter ", func(t *testing.T) { - logSelectorExpr, err := parseDeletionQuery(`{env="dev", secret="true"} | json bob="top.params[0]"`) - require.Nil(t, logSelectorExpr) - require.ErrorIs(t, err, errUnsupportedQuery) + t.Run("pipeline expression with multiple line filters", func(t *testing.T) { + logSelectorExpr, err := parseDeletionQuery(`{env="dev", secret="true"} |= "social sec number" |~ "[abd]*" `) + require.NotNil(t, logSelectorExpr) + require.NoError(t, err) }) - t.Run("metrics query", func(t *testing.T) { - logSelectorExpr, err := parseDeletionQuery(`count_over_time({job="mysql"}[5m])`) + t.Run("pipeline expression with invalid line filter", func(t *testing.T) { + logSelectorExpr, err := parseDeletionQuery(`{env="dev", secret="true"} |= social sec number`) require.Nil(t, logSelectorExpr) - require.ErrorIs(t, err, errUnsupportedQuery) + require.ErrorIs(t, err, errInvalidQuery) }) - */ } From c49b28322152bc485321e72b70cbe521bfaa892e Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 26 Apr 2022 08:40:27 +0100 Subject: [PATCH 16/19] Add extra test case for delete request without line filter Signed-off-by: Michel Hollands --- .../compactor/deletion/delete_request_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go index 1df2c426ea197..2d104038e33b9 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go @@ -224,4 +224,19 @@ func TestDeleteRequest_FilterFunction(t *testing.T) { require.False(t, f("")) require.False(t, f("other line")) require.False(t, f("some line")) + + dr = DeleteRequest{ + Query: `{namespace="default"}`, + } + + lblStr = `{namespace="default"}` + lbls = mustParseLabel(lblStr) + + f, err = dr.FilterFunction(lbls) + require.NoError(t, err) + + require.True(t, f(`some line`)) + require.True(t, f("")) + require.True(t, f("other line")) + } From 414eb487ffc288d1d7585ec9f91ffcf841cd5964 Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Tue, 26 Apr 2022 10:00:21 +0100 Subject: [PATCH 17/19] Use chunk bounds Signed-off-by: Michel Hollands --- pkg/storage/stores/shipper/compactor/retention/retention.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index 5415924690435..033bd4195fd7a 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -358,7 +358,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, interva return false, err } - entries, err := c.seriesStoreSchema.GetChunkWriteEntries(start, end, userID, "logs", newChunk.Metric, c.scfg.ExternalKey(newChunk.ChunkRef)) + entries, err := c.seriesStoreSchema.GetChunkWriteEntries(newChunk.From, newChunk.Through, userID, "logs", newChunk.Metric, c.scfg.ExternalKey(newChunk.ChunkRef)) if err != nil { return false, err } From a8400022668fe3c11b558693a894c4186d518b2b Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Thu, 28 Apr 2022 09:28:10 +0100 Subject: [PATCH 18/19] check if the log selector has a filter if the whole chunk is selected Signed-off-by: Michel Hollands --- .../compactor/deletion/delete_request.go | 31 +++++++++++--- .../compactor/deletion/delete_request_test.go | 41 +++++++++++++++++++ 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go index b096128838de5..3800a2c38b2e3 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go @@ -1,12 +1,14 @@ package deletion import ( + "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/util/filter" + util_log "github.com/grafana/loki/pkg/util/log" ) type DeleteRequest struct { @@ -69,7 +71,7 @@ func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool { } // IsDeleted checks if the given ChunkEntry will be deleted by this DeleteRequest. -// It also returns the intervals of the ChunkEntry that will remain. +// It also returns the intervals of the ChunkEntry that will remain before filtering. func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []retention.IntervalFilter) { if d.UserID != unsafeGetString(entry.UserID) { return false, nil @@ -89,16 +91,33 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []retention return false, nil } + ff, err := d.FilterFunction(entry.Labels) + if err != nil { + // The query in the delete request is checked when added to the table. + // So this error should not occur. + level.Error(util_log.Logger).Log("msg", "unexpected error getting filter function", "err", err) + return false, nil + } + if d.StartTime <= entry.From && d.EndTime >= entry.Through { + // if the logSelectorExpr has a filter part return the chunk boundaries as intervals + if d.logSelectorExpr.HasFilter() { + return true, []retention.IntervalFilter{ + { + Interval: model.Interval{ + Start: entry.From, + End: entry.Through, + }, + Filter: ff, + }, + } + } + + // No filter in the logSelectorExpr so the whole chunk will be deleted return true, nil } intervals := make([]retention.IntervalFilter, 0, 2) - ff, err := d.FilterFunction(entry.Labels) - if err != nil { - // TODO: log this? Handle it different? - return false, nil - } if d.StartTime > entry.From { intervals = append(intervals, retention.IntervalFilter{ diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go index 2d104038e33b9..e609d10555408 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go @@ -17,6 +17,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { user1 := "user1" lbl := `{foo="bar", fizz="buzz"}` + lblWithFilter := `{foo="bar", fizz="buzz"} |= "filter"` chunkEntry := retention.ChunkEntry{ ChunkRef: retention.ChunkRef{ @@ -50,6 +51,26 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { nonDeletedIntervals: nil, }, }, + { + name: "whole chunk deleted with filter present", + deleteRequest: DeleteRequest{ + UserID: user1, + StartTime: now.Add(-3 * time.Hour), + EndTime: now.Add(-time.Hour), + Query: lblWithFilter, + }, + expectedResp: resp{ + isDeleted: true, + nonDeletedIntervals: []retention.IntervalFilter{ + { + Interval: model.Interval{ + Start: now.Add(-3 * time.Hour), + End: now.Add(-time.Hour), + }, + }, + }, + }, + }, { name: "chunk deleted from beginning", deleteRequest: DeleteRequest{ @@ -110,6 +131,26 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { }, }, }, + { + name: "chunk deleted from end with filter", + deleteRequest: DeleteRequest{ + UserID: user1, + StartTime: now.Add(-2 * time.Hour), + EndTime: now, + Query: lblWithFilter, + }, + expectedResp: resp{ + isDeleted: true, + nonDeletedIntervals: []retention.IntervalFilter{ + { + Interval: model.Interval{ + Start: now.Add(-3 * time.Hour), + End: now.Add(-2*time.Hour) - 1, + }, + }, + }, + }, + }, { name: "chunk deleted in the middle", deleteRequest: DeleteRequest{ From 6f7ba8667c7b3beccdc7bb9b0d50f8e874ac036d Mon Sep 17 00:00:00 2001 From: Michel Hollands Date: Thu, 28 Apr 2022 09:37:25 +0100 Subject: [PATCH 19/19] fix lint issue: use correct go-kit import Signed-off-by: Michel Hollands --- pkg/storage/stores/shipper/compactor/deletion/delete_request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go index 3800a2c38b2e3..d6e26d472ef51 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go @@ -1,7 +1,7 @@ package deletion import ( - "github.com/go-kit/kit/log/level" + "github.com/go-kit/log/level" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels"