From a159680ca437576bf78f7b08a14715a61f5d3ca9 Mon Sep 17 00:00:00 2001 From: Kartik-Garg Date: Sat, 18 Feb 2023 00:36:30 +0530 Subject: [PATCH 1/2] Repair non-empty XOR chunks during 1h downsampling Iterated through the non empty XOR chunks, stored the results in a buffer and then downsampled them to 5m aggregated chunks by using DownsampleRaw method and then passed them for 1h downsampling. Signed-off-by: Kartik-Garg --- pkg/compact/downsample/downsample.go | 15 ++++++++++++++- pkg/compact/downsample/downsample_test.go | 19 +++++++++++++------ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 07cf294f06..c93c03d7d8 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -167,10 +167,23 @@ func Downsample( // https://github.com/thanos-io/thanos/issues/5272 level.Warn(logger).Log("msg", fmt.Sprintf("expected downsampled chunk (*downsample.AggrChunk) got an empty %T instead for series: %d", c.Chunk, postings.At())) continue + } else { + if err := expandChunkIterator(c.Chunk.Iterator(reuseIt), &all); err != nil { + return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At()) + } + aggrDataChunks := DownsampleRaw(all, ResLevel1) + for _, cn := range aggrDataChunks { + ac, ok = cn.Chunk.(*AggrChunk) + if !ok { + level.Warn(logger).Log("Not able to convert non-empty XOR chunks into 5m downsampled Aggregated chunks") + continue + } + } } - return id, errors.Errorf("expected downsampled chunk (*downsample.AggrChunk) got a non-empty %T instead for series: %d", c.Chunk, postings.At()) + } aggrChunks = append(aggrChunks, ac) + } downsampledChunks, err := downsampleAggr( aggrChunks, diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index d86f610007..53537e876a 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -543,16 +543,21 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) { ser := &series{lset: labels.FromStrings("__name__", "a")} aggr := map[AggrType][]sample{ - AggrCount: {{t: 1587690299999, v: 20}, {t: 1587690599999, v: 20}, {t: 1587690899999, v: 20}, {t: 1587691199999, v: 20}, {t: 1587691499999, v: 20}, {t: 1587691799999, v: 20}, {t: 1587692099999, v: 20}, {t: 1587692399999, v: 20}, {t: 1587692699999, v: 16}, {t: 1587692999999, v: 20}, {t: 1587693299999, v: 20}, {t: 1587693590791, v: 20}}, - AggrSum: {{t: 1587690299999, v: 9.276972e+06}, {t: 1587690599999, v: 9.359861e+06}, {t: 1587690899999, v: 9.447457e+06}, {t: 1587691199999, v: 9.542732e+06}, {t: 1587691499999, v: 9.630379e+06}, {t: 1587691799999, v: 9.715631e+06}, {t: 1587692099999, v: 9.799808e+06}, {t: 1587692399999, v: 9.888117e+06}, {t: 1587692699999, v: 2.98928e+06}, {t: 1587692999999, v: 81592}, {t: 1587693299999, v: 163711}, {t: 1587693590791, v: 255746}}, - AggrMin: {{t: 1587690299999, v: 461968}, {t: 1587690599999, v: 466070}, {t: 1587690899999, v: 470131}, {t: 1587691199999, v: 474913}, {t: 1587691499999, v: 479625}, {t: 1587691799999, v: 483709}, {t: 1587692099999, v: 488036}, {t: 1587692399999, v: 492223}, {t: 1587692699999, v: 75}, {t: 1587692999999, v: 2261}, {t: 1587693299999, v: 6210}, {t: 1587693590791, v: 10464}}, - AggrMax: {{t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}, {t: 1587691199999, v: 479368}, {t: 1587691499999, v: 483566}, {t: 1587691799999, v: 487787}, {t: 1587692099999, v: 492065}, {t: 1587692399999, v: 496245}, {t: 1587692699999, v: 496544}, {t: 1587692999999, v: 6010}, {t: 1587693299999, v: 10242}, {t: 1587693590791, v: 14956}}, - AggrCounter: {{t: 1587690005791, v: 461968}, {t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}, {t: 1587691199999, v: 479368}, {t: 1587691499999, v: 483566}, {t: 1587691799999, v: 487787}, {t: 1587692099999, v: 492065}, {t: 1587692399999, v: 496245}, {t: 1587692699999, v: 498647}, {t: 1587692999999, v: 502554}, {t: 1587693299999, v: 506786}, {t: 1587693590791, v: 511500}, {t: 1587693590791, v: 14956}}, + AggrCount: {{t: 1587690299999, v: 20}}, + AggrSum: {{t: 1587693590791, v: 255746}}, + AggrMin: {{t: 1587690299999, v: 461968}}, + AggrMax: {{t: 1587690299999, v: 465870}}, + AggrCounter: {{t: 1587690005791, v: 461968}}, } raw := chunkenc.NewXORChunk() app, err := raw.Appender() testutil.Ok(t, err) + // this comes in !ok and passes through our newly created funcionality app.Append(1587690005794, 42.5) + //app.Append(1587690005795, 42.6) + // app.Append(1587690005796, 42.7) + // app.Append(1587690005797, 42.8) + // app.Append(1587690005798, 42.9) ser.chunks = append(ser.chunks, encodeTestAggrSeries(aggr), chunks.Meta{ MinTime: math.MaxInt64, MaxTime: math.MinInt64, @@ -563,10 +568,12 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) { mb.addSeries(ser) fakeMeta := &metadata.Meta{} + // target fakeMeta.Thanos.Downsample.Resolution = 300_000 + // already existing resolution id, err := Downsample(logger, fakeMeta, mb, dir, 3_600_000) _ = id - testutil.NotOk(t, err) + testutil.Ok(t, err) } func chunksToSeriesIteratable(t *testing.T, inRaw [][]sample, inAggr []map[AggrType][]sample) *series { From 6fc86a535f28da8559b552f38aceaaa8cd0d389e Mon Sep 17 00:00:00 2001 From: Kartik-Garg Date: Sun, 19 Feb 2023 04:27:09 +0530 Subject: [PATCH 2/2] Making changes in the test cases Added few more test cases to check if the data being downsampled gets converted to aggregated and also a test case to check if non-empty XOR chunks can be iterated through. Made changes in the test cases. Signed-off-by: Kartik-Garg --- CHANGELOG.md | 1 + pkg/compact/downsample/downsample.go | 3 +- pkg/compact/downsample/downsample_test.go | 78 +++++++++++++++++++---- 3 files changed, 67 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8dfefc1a9..4423537567 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6098](https://github.com/thanos-io/thanos/pull/6098) Cache/Redis: upgrade `rueidis` to v0.0.93 to fix potential panic when the client-side caching is disabled. - [#6103](https://github.com/thanos-io/thanos/pull/6103) Mixins(Rule): Fix query for long rule evaluations. - [#6121](https://github.com/thanos-io/thanos/pull/6121) Receive: Deduplicate metamonitoring queries. +- [#6137](https://github.com/thanos-io/thanos/pull/6137) Downsample: Repair of non-empty XOR chunks during 1h downsampling. ### Changed diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index c93c03d7d8..d55c95f449 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -175,8 +175,7 @@ func Downsample( for _, cn := range aggrDataChunks { ac, ok = cn.Chunk.(*AggrChunk) if !ok { - level.Warn(logger).Log("Not able to convert non-empty XOR chunks into 5m downsampled Aggregated chunks") - continue + return id, errors.New("Not able to convert non-empty XOR chunks to 5m downsampled aggregated chunks.") } } } diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 53537e876a..14c4d4dea8 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -538,26 +538,23 @@ func TestDownsampleAggrAndEmptyXORChunks(t *testing.T) { } func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) dir := t.TempDir() - ser := &series{lset: labels.FromStrings("__name__", "a")} aggr := map[AggrType][]sample{ - AggrCount: {{t: 1587690299999, v: 20}}, - AggrSum: {{t: 1587693590791, v: 255746}}, - AggrMin: {{t: 1587690299999, v: 461968}}, - AggrMax: {{t: 1587690299999, v: 465870}}, - AggrCounter: {{t: 1587690005791, v: 461968}}, + AggrCount: {{t: 1587690299999, v: 20}, {t: 1587690599999, v: 20}, {t: 1587690899999, v: 20}}, + AggrSum: {{t: 1587690299999, v: 9.276972e+06}, {t: 1587690599999, v: 9.359861e+06}, {t: 1587693590791, v: 255746}}, + AggrMin: {{t: 1587690299999, v: 461968}, {t: 1587690599999, v: 466070}, {t: 1587690899999, v: 470131}, {t: 1587691199999, v: 474913}}, + AggrMax: {{t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}}, + AggrCounter: {{t: 1587690005791, v: 461968}, {t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}}, } raw := chunkenc.NewXORChunk() app, err := raw.Appender() testutil.Ok(t, err) - // this comes in !ok and passes through our newly created funcionality + app.Append(1587690005794, 42.5) - //app.Append(1587690005795, 42.6) - // app.Append(1587690005796, 42.7) - // app.Append(1587690005797, 42.8) - // app.Append(1587690005798, 42.9) + ser.chunks = append(ser.chunks, encodeTestAggrSeries(aggr), chunks.Meta{ MinTime: math.MaxInt64, MaxTime: math.MinInt64, @@ -568,12 +565,67 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) { mb.addSeries(ser) fakeMeta := &metadata.Meta{} - // target fakeMeta.Thanos.Downsample.Resolution = 300_000 - // already existing resolution id, err := Downsample(logger, fakeMeta, mb, dir, 3_600_000) _ = id testutil.Ok(t, err) + + expected := []map[AggrType][]sample{ + { + AggrCount: {{1587690005794, 20}, {1587690005794, 20}, {1587690005794, 21}}, + AggrSum: {{1587690005794, 9.276972e+06}, {1587690005794, 9.359861e+06}, {1587690005794, 255788.5}}, + AggrMin: {{1587690005794, 461968}, {1587690005794, 466070}, {1587690005794, 470131}, {1587690005794, 42.5}}, + AggrMax: {{1587690005794, 465870}, {1587690005794, 469951}, {1587690005794, 474726}}, + AggrCounter: {{1587690005791, 461968}, {1587690599999, 469951}, {1587690599999, 469951}}, + }, + } + + _, err = metadata.ReadFromDir(filepath.Join(dir, id.String())) + testutil.Ok(t, err) + + indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename)) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, indexr.Close()) }() + + chunkr, err := chunks.NewDirReader(filepath.Join(dir, id.String(), block.ChunksDirname), NewPool()) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, chunkr.Close()) }() + + pall, err := indexr.Postings(index.AllPostingsKey()) + testutil.Ok(t, err) + + var series []storage.SeriesRef + for pall.Next() { + series = append(series, pall.At()) + } + testutil.Ok(t, pall.Err()) + testutil.Equals(t, 1, len(series)) + + var builder labels.ScratchBuilder + var chks []chunks.Meta + testutil.Ok(t, indexr.Series(series[0], &builder, &chks)) + + var got []map[AggrType][]sample + for _, c := range chks { + chk, err := chunkr.Chunk(c) + testutil.Ok(t, err) + + m := map[AggrType][]sample{} + for _, at := range []AggrType{AggrCount, AggrSum, AggrMin, AggrMax, AggrCounter} { + c, err := chk.(*AggrChunk).Get(at) + if err == ErrAggrNotExist { + continue + } + testutil.Ok(t, err) + + buf := m[at] + testutil.Ok(t, expandChunkIterator(c.Iterator(nil), &buf)) + m[at] = buf + } + got = append(got, m) + } + testutil.Equals(t, expected, got) + } func chunksToSeriesIteratable(t *testing.T, inRaw [][]sample, inAggr []map[AggrType][]sample) *series {