diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index f8977925ffeb0..31d88e583acbe 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -2,6 +2,7 @@ package chunkenc import ( "context" + "io" "sort" "time" @@ -107,6 +108,8 @@ func (c *dumbChunk) BytesWith(_ []byte) ([]byte, error) { return nil, nil } +func (c *dumbChunk) WriteTo(w io.Writer) (int64, error) { return 0, nil } + func (c *dumbChunk) Blocks(_ time.Time, _ time.Time) []Block { return nil } diff --git a/pkg/chunkenc/facade.go b/pkg/chunkenc/facade.go index f44db9fe50acb..3b949690b71fb 100644 --- a/pkg/chunkenc/facade.go +++ b/pkg/chunkenc/facade.go @@ -44,12 +44,10 @@ func (f Facade) Marshal(w io.Writer) error { if f.c == nil { return nil } - buf, err := f.c.Bytes() - if err != nil { + if _, err := f.c.WriteTo(w); err != nil { return err } - _, err = w.Write(buf) - return err + return nil } // UnmarshalFromBuf implements encoding.Chunk. diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index de0bfc418fbd2..6b077289c9549 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "strings" "time" @@ -105,6 +106,7 @@ type Chunk interface { Size() int Bytes() ([]byte, error) BytesWith([]byte) ([]byte, error) // uses provided []byte for buffer instantiation + io.WriterTo BlockCount() int Utilization() float64 UncompressedSize() int diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 65c61a52ab70c..6ca2e1da1ddbf 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -257,16 +257,29 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { // BytesWith uses a provided []byte for buffer instantiation func (c *MemChunk) BytesWith(b []byte) ([]byte, error) { + buf := bytes.NewBuffer(b[:0]) + if _, err := c.WriteTo(buf); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// Bytes implements Chunk. +func (c *MemChunk) Bytes() ([]byte, error) { + return c.BytesWith(nil) +} + +// WriteTo Implements io.WriterTo +func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { if c.head != nil { // When generating the bytes, we need to flush the data held in-buffer. if err := c.cut(); err != nil { - return nil, err + return 0, err } } crc32Hash := newCRC32() - buf := bytes.NewBuffer(b[:0]) - offset := 0 + offset := int64(0) eb := encbuf{b: make([]byte, 0, 1<<10)} @@ -278,25 +291,25 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) { eb.putByte(byte(c.encoding)) } - n, err := buf.Write(eb.get()) + n, err := w.Write(eb.get()) if err != nil { - return buf.Bytes(), errors.Wrap(err, "write blockMeta #entries") + return offset, errors.Wrap(err, "write blockMeta #entries") } - offset += n + offset += int64(n) // Write Blocks. for i, b := range c.blocks { - c.blocks[i].offset = offset + c.blocks[i].offset = int(offset) eb.reset() eb.putBytes(b.b) eb.putHash(crc32Hash) - n, err := buf.Write(eb.get()) + n, err := w.Write(eb.get()) if err != nil { - return buf.Bytes(), errors.Wrap(err, "write block") + return offset, errors.Wrap(err, "write block") } - offset += n + offset += int64(n) } metasOffset := offset @@ -317,25 +330,22 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) { } eb.putHash(crc32Hash) - _, err = buf.Write(eb.get()) + n, err = w.Write(eb.get()) if err != nil { - return buf.Bytes(), errors.Wrap(err, "write block metas") + return offset, errors.Wrap(err, "write block metas") } + offset += int64(n) // Write the metasOffset. eb.reset() - eb.putBE64int(metasOffset) - _, err = buf.Write(eb.get()) + eb.putBE64int(int(metasOffset)) + n, err = w.Write(eb.get()) if err != nil { - return buf.Bytes(), errors.Wrap(err, "write metasOffset") + return offset, errors.Wrap(err, "write metasOffset") } + offset += int64(n) - return buf.Bytes(), nil -} - -// Bytes implements Chunk. -func (c *MemChunk) Bytes() ([]byte, error) { - return c.BytesWith(nil) + return offset, nil } // Encoding implements Chunk. diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index efa59d3359f51..a0f2a72b8aafe 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "math/rand" "net/http" "testing" @@ -221,6 +222,8 @@ func (c *noopChunk) BytesWith(_ []byte) ([]byte, error) { return nil, nil } +func (c *noopChunk) WriteTo(w io.Writer) (int64, error) { return 0, nil } + func (c *noopChunk) Blocks(_ time.Time, _ time.Time) []chunkenc.Block { return nil }