-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store/bucket: make getFor() work with interleaved resolutions #1146
Changes from 8 commits
9c94918
cd3363c
00b2c7c
8f22504
e2c46bf
342aede
4e73b2f
fae1929
bf12553
de4b79a
c3abb09
f22cffa
5e9d0e9
420ebe0
e8b3189
bacfd06
93a764c
f4b0a66
dac133e
dbe3727
6dac954
55aa32d
3fe4217
3bbc6cb
3602678
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -675,7 +675,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag | |
// labels and resolution. This is important because we allow mixed resolution results, so it is quite crucial | ||
// to be aware what exactly resolution we see on query. | ||
// TODO(bplotka): Consider adding resolution label to all results to propagate that info to UI and Query API. | ||
func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels.Labels, bs []*bucketBlock) { | ||
func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxresolution int64, lset labels.Labels, bs []*bucketBlock) { | ||
GiedriusS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if len(bs) == 0 { | ||
level.Debug(logger).Log("msg", "No block found", "mint", mint, "maxt", maxt, "lset", lset.String()) | ||
return | ||
|
@@ -703,7 +703,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels | |
|
||
parts = append(parts, fmt.Sprintf("Range: %d-%d Resolution: %d", currMin, currMax, currRes)) | ||
|
||
level.Debug(logger).Log("msg", "Blocks source resolutions", "blocks", len(bs), "mint", mint, "maxt", maxt, "lset", lset.String(), "spans", strings.Join(parts, "\n")) | ||
level.Debug(logger).Log("msg", "Blocks source resolutions", "blocks", len(bs), "maxresolution", maxresolution, "mint", mint, "maxt", maxt, "lset", lset.String(), "spans", strings.Join(parts, "\n")) | ||
} | ||
|
||
// Series implements the storepb.StoreServer interface. | ||
|
@@ -738,7 +738,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie | |
blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow) | ||
|
||
if s.debugLogging { | ||
debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, bs.labels, blocks) | ||
debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks) | ||
} | ||
|
||
for _, b := range blocks { | ||
|
@@ -1022,6 +1022,8 @@ func (s *bucketBlockSet) getFor(mint, maxt, minResolution int64) (bs []*bucketBl | |
} | ||
// Our current resolution might not cover all data, recursively fill the gaps at the start | ||
// and end of [mint, maxt] with higher resolution blocks. | ||
// | ||
// Plus, fill the possible gaps between the current blocks with higher resolution blocks. | ||
i++ | ||
// No higher resolution left, we are done. | ||
if i >= len(s.resolutions) { | ||
|
@@ -1030,6 +1032,19 @@ func (s *bucketBlockSet) getFor(mint, maxt, minResolution int64) (bs []*bucketBl | |
if len(bs) == 0 { | ||
return s.getFor(mint, maxt, s.resolutions[i]) | ||
} | ||
|
||
until := len(bs) - 1 | ||
for j := 0; j < until; j++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's tricky, need to dive more but generally makes sense. Explaining what we are seeing now. Thanks for tests catching this as well! All of this assumes that blocks within different resolutions are aligned ideally right? Can we comment this somehow? Also be aware of this: #1104 Hope you algorthim would be nicely extendable I guess. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They always should be aligned ideally or on the same timestamps because in other cases we would get a problem known as "overlapping blocks". This change should be easy to integrate into that PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, It makes total sense BUT to me Can we remove this left and right and treat them properly in this loop as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea sounds good and I played around with it a bit but I doubt that this function can be made more succinct because we have three distinct cases of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, is it really too complex? I will propose something to your PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Proposed: GiedriusS#1 IMO it's way simpler, what do you think? @GiedriusS // getFor returns a time-ordered list of blocks that cover date between mint and maxt.
// Blocks with the biggest resolution possible but not bigger than the given max resolution are returned.
func (s *bucketBlockSet) getFor(mint, maxt, maxResolutionMillis int64) (bs []*bucketBlock) {
if mint == maxt {
return nil
}
s.mtx.RLock()
defer s.mtx.RUnlock()
// Find first matching resolution.
i := 0
for ; i < len(s.resolutions) && s.resolutions[i] > maxResolutionMillis; i++ {
}
// Fill the given interval with the blocks for the current resolution.
// Our current resolution might not cover all data, so recursively fill the gaps with higher resolution blocks if there is any.
start := mint
for _, b := range s.blocks[i] {
if b.meta.MaxTime <= mint {
continue
}
if b.meta.MinTime >= maxt {
break
}
if i+1 < len(s.resolutions) {
bs = append(bs, s.getFor(start, b.meta.MinTime, s.resolutions[i+1])...)
}
bs = append(bs, b)
start = b.meta.MaxTime
}
if i+1 < len(s.resolutions) {
bs = append(bs, s.getFor(start, maxt, s.resolutions[i+1])...)
}
return bs
} |
||
if bs[j+1].meta.MinTime-bs[j].meta.MaxTime > 0 { | ||
between := s.getFor(bs[j].meta.MaxTime, bs[j+1].meta.MinTime, s.resolutions[i]) | ||
bs = append(bs[:j+1], append(between, bs[j+1:]...)...) | ||
|
||
// Push the iterators further. | ||
j += len(between) | ||
until += len(between) | ||
} | ||
} | ||
|
||
left := s.getFor(mint, bs[0].meta.MinTime, s.resolutions[i]) | ||
right := s.getFor(bs[len(bs)-1].meta.MaxTime, maxt, s.resolutions[i]) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,149 @@ import ( | |
"github.com/prometheus/tsdb/labels" | ||
) | ||
|
||
// TestBucketBlockSet with blocks which have the same time range | ||
// but different resolutions. | ||
func TestBucketBlockSet_Duplicated(t *testing.T) { | ||
defer leaktest.CheckTimeout(t, 10*time.Second)() | ||
|
||
set := newBucketBlockSet(labels.Labels{}) | ||
|
||
type resBlock struct { | ||
mint, maxt int64 | ||
window int64 | ||
} | ||
input := []resBlock{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about joinging those together in form of table test? essentiall add those as test cases to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These ones are a bit special because we are doing a property-based test instead of an ad-hoc one. I have removed the other test cases that I've added before and only left this one in - this already covers the "interleaved resolutions" case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
{window: downsample.ResLevel2, mint: 0, maxt: 100}, | ||
{window: downsample.ResLevel0, mint: 0, maxt: 100}, | ||
{window: downsample.ResLevel1, mint: 0, maxt: 100}, | ||
{window: downsample.ResLevel0, mint: 100, maxt: 200}, | ||
{window: downsample.ResLevel1, mint: 100, maxt: 200}, | ||
{window: downsample.ResLevel2, mint: 100, maxt: 200}, | ||
{window: downsample.ResLevel2, mint: 200, maxt: 300}, | ||
{window: downsample.ResLevel1, mint: 200, maxt: 300}, | ||
} | ||
|
||
for _, in := range input { | ||
var m metadata.Meta | ||
m.Thanos.Downsample.Resolution = in.window | ||
m.MinTime = in.mint | ||
m.MaxTime = in.maxt | ||
|
||
testutil.Ok(t, set.add(&bucketBlock{meta: &m})) | ||
} | ||
|
||
cases := []struct { | ||
mint, maxt int64 | ||
minResolution int64 | ||
res []resBlock | ||
}{ | ||
{ | ||
mint: 0, | ||
maxt: 300, | ||
minResolution: downsample.ResLevel2, | ||
res: []resBlock{ | ||
{window: downsample.ResLevel2, mint: 0, maxt: 100}, | ||
{window: downsample.ResLevel2, mint: 100, maxt: 200}, | ||
{window: downsample.ResLevel2, mint: 200, maxt: 300}, | ||
}, | ||
}, | ||
} | ||
|
||
for i, c := range cases { | ||
t.Logf("case %d", i) | ||
|
||
var exp []*bucketBlock | ||
for _, b := range c.res { | ||
var m metadata.Meta | ||
m.Thanos.Downsample.Resolution = b.window | ||
m.MinTime = b.mint | ||
m.MaxTime = b.maxt | ||
exp = append(exp, &bucketBlock{meta: &m}) | ||
} | ||
res := set.getFor(c.mint, c.maxt, c.minResolution) | ||
testutil.Equals(t, exp, res) | ||
} | ||
} | ||
|
||
// TestBucketBlockSet with blocks with different resolutions | ||
// that interleave between each other. | ||
func TestBucketBlockSet_Interleaved(t *testing.T) { | ||
defer leaktest.CheckTimeout(t, 10*time.Second)() | ||
|
||
set := newBucketBlockSet(labels.Labels{}) | ||
|
||
type resBlock struct { | ||
mint, maxt int64 | ||
window int64 | ||
} | ||
input := []resBlock{ | ||
{window: downsample.ResLevel2, mint: 0, maxt: 50}, | ||
{window: downsample.ResLevel1, mint: 50, maxt: 100}, | ||
{window: downsample.ResLevel2, mint: 100, maxt: 200}, | ||
{window: downsample.ResLevel1, mint: 200, maxt: 300}, | ||
{window: downsample.ResLevel1, mint: 300, maxt: 400}, | ||
{window: downsample.ResLevel1, mint: 400, maxt: 500}, | ||
{window: downsample.ResLevel0, mint: 500, maxt: 600}, | ||
{window: downsample.ResLevel0, mint: 600, maxt: 700}, | ||
} | ||
|
||
for _, in := range input { | ||
var m metadata.Meta | ||
m.Thanos.Downsample.Resolution = in.window | ||
m.MinTime = in.mint | ||
m.MaxTime = in.maxt | ||
|
||
testutil.Ok(t, set.add(&bucketBlock{meta: &m})) | ||
} | ||
|
||
cases := []struct { | ||
mint, maxt int64 | ||
minResolution int64 | ||
res []resBlock | ||
}{ | ||
{ | ||
mint: 0, | ||
maxt: 700, | ||
minResolution: downsample.ResLevel2, | ||
res: []resBlock{ | ||
{window: downsample.ResLevel2, mint: 0, maxt: 50}, | ||
{window: downsample.ResLevel1, mint: 50, maxt: 100}, | ||
{window: downsample.ResLevel2, mint: 100, maxt: 200}, | ||
{window: downsample.ResLevel1, mint: 200, maxt: 300}, | ||
{window: downsample.ResLevel1, mint: 300, maxt: 400}, | ||
{window: downsample.ResLevel1, mint: 400, maxt: 500}, | ||
{window: downsample.ResLevel0, mint: 500, maxt: 600}, | ||
{window: downsample.ResLevel0, mint: 600, maxt: 700}, | ||
}, | ||
}, | ||
{ | ||
mint: 100, | ||
maxt: 400, | ||
minResolution: downsample.ResLevel2, | ||
res: []resBlock{ | ||
{window: downsample.ResLevel2, mint: 100, maxt: 200}, | ||
{window: downsample.ResLevel1, mint: 200, maxt: 300}, | ||
{window: downsample.ResLevel1, mint: 300, maxt: 400}, | ||
}, | ||
}, | ||
} | ||
|
||
for i, c := range cases { | ||
t.Logf("case %d", i) | ||
|
||
var exp []*bucketBlock | ||
for _, b := range c.res { | ||
var m metadata.Meta | ||
m.Thanos.Downsample.Resolution = b.window | ||
m.MinTime = b.mint | ||
m.MaxTime = b.maxt | ||
exp = append(exp, &bucketBlock{meta: &m}) | ||
} | ||
res := set.getFor(c.mint, c.maxt, c.minResolution) | ||
testutil.Equals(t, exp, res) | ||
} | ||
} | ||
|
||
func TestBucketBlockSet_addGet(t *testing.T) { | ||
defer leaktest.CheckTimeout(t, 10*time.Second)() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite confusing - hard to tell for me if this makes sense (:
This has the same unit as mint, maxt int64 right? Maybe making the underlying variable
maxSourceResolutionMiliseonconds
or keeping it time.Duration as long as possible makes sense?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm.. am I right that is PR was actually wrong? #1144 And we moved to miliseconds there but we are doing exactly the same here and we missed it? 🤔
If yes, let's remove https://github.com/improbable-eng/thanos/blob/2c4f64b1b96907295a7f8e99d8fd64697f0eb12a/pkg/query/api/v1.go#L227 I think it's wrong. All is in time.Duration so moving to miliseconds there does not make sense. It's here that we transition from duration to integer having arbitrary unit (miliseconds) so I think this should stay and last PR should be reverted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And yea, master is broken now in terms of downsampling if I am not wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert PR:#1154
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should move the whole function to return
int64
and in a form appropriate to the caller since as we can see it is hard to make a mental connection between such distant places in the code? Because that's what parsing should do IMHO - we shouldn't do more parsing down the road.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a bit of an overstatement if it has never worked properly 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure but now it does not work even more, so there is regression (:
Anyway, I just want to avoid confusion, sorry for being quite strict here ):
Now once we had no regression we can actually make that work and nice. Yes, I am fine with:
maxResolutionMilisec int64
name everywhere.Thoughts? (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed that's what I've done. Hopefully it is much more clearer now about what kind of units are being used and that no one in the future will run into this problem like me when I was trying to hunt down one problem but accidentally got caught up in this.