Skip to content

Commit

Permalink
Merge pull request grafana#1216 from cortexproject/bigchunk-slice
Browse files Browse the repository at this point in the history
More efficient memory layout of bigchunk chunks
  • Loading branch information
tomwilkie authored Feb 5, 2019
2 parents 9200617 + b72e53f commit ea011a6
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions encoding/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ const samplesPerChunk = 120

var errOutOfBounds = errors.New("out of bounds")

type smallChunk struct {
chunkenc.Chunk
start int64
end int64
}

// bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no
// upperbound on number of samples it can contain.
type bigchunk struct {
chunks []chunkenc.Chunk
starts []int64
ends []int64
chunks []smallChunk

appender chunkenc.Appender
remainingSamples int
Expand All @@ -41,7 +45,7 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {

b.appender.Append(int64(sample.Timestamp), float64(sample.Value))
b.remainingSamples--
b.ends[len(b.ends)-1] = int64(sample.Timestamp)
b.chunks[len(b.chunks)-1].end = int64(sample.Timestamp)
return []Chunk{b}, nil
}

Expand All @@ -50,14 +54,14 @@ func (b *bigchunk) addNextChunk(start model.Time) error {
// To save memory, we "compact" the previous chunk - the array backing the slice
// will be upto 2x too big, and we can save this space.
if l := len(b.chunks); l > 0 {
c := b.chunks[l-1]
c := b.chunks[l-1].Chunk
buf := make([]byte, len(c.Bytes()))
copy(buf, c.Bytes())
compacted, err := chunkenc.FromData(chunkenc.EncXOR, buf)
if err != nil {
return err
}
b.chunks[l-1] = compacted
b.chunks[l-1].Chunk = compacted
}

chunk := chunkenc.NewXORChunk()
Expand All @@ -66,9 +70,11 @@ func (b *bigchunk) addNextChunk(start model.Time) error {
return err
}

b.starts = append(b.starts, int64(start))
b.ends = append(b.ends, int64(start))
b.chunks = append(b.chunks, chunk)
b.chunks = append(b.chunks, smallChunk{
Chunk: chunk,
start: int64(start),
end: int64(start),
})

b.appender = appender
b.remainingSamples = samplesPerChunk
Expand Down Expand Up @@ -104,7 +110,7 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
return err
}

b.chunks = make([]chunkenc.Chunk, 0, numChunks)
b.chunks = make([]smallChunk, 0, numChunks)
for i := uint16(0); i < numChunks; i++ {
chunkLen, err := r.ReadUint16()
if err != nil {
Expand All @@ -126,9 +132,11 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
return err
}

b.chunks = append(b.chunks, chunk)
b.starts = append(b.starts, start)
b.ends = append(b.ends, end)
b.chunks = append(b.chunks, smallChunk{
Chunk: chunk,
start: int64(start),
end: int64(end),
})
}
return nil
}
Expand Down Expand Up @@ -167,18 +175,16 @@ func (b *bigchunk) NewIterator() Iterator {
func (b *bigchunk) Slice(start, end model.Time) Chunk {
i, j := 0, len(b.chunks)
for k := 0; k < len(b.chunks); k++ {
if b.ends[k] < int64(start) {
if b.chunks[k].end < int64(start) {
i = k + 1
}
if b.starts[k] > int64(end) {
if b.chunks[k].start > int64(end) {
j = k
break
}
}
return &bigchunk{
chunks: b.chunks[i:j],
starts: b.starts[i:j],
ends: b.ends[i:j],
}
}

Expand Down Expand Up @@ -230,9 +236,9 @@ func (it *bigchunkIterator) FindAtOrAfter(target model.Time) bool {

// If the seek is outside the current chunk, use the index to find the right
// chunk.
if int64(target) < it.starts[it.i] || int64(target) > it.ends[it.i] {
if int64(target) < it.chunks[it.i].start || int64(target) > it.chunks[it.i].end {
it.curr = nil
for it.i = 0; it.i < len(it.chunks) && int64(target) > it.ends[it.i]; it.i++ {
for it.i = 0; it.i < len(it.chunks) && int64(target) > it.chunks[it.i].end; it.i++ {
}
}

Expand Down

0 comments on commit ea011a6

Please sign in to comment.