Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Merge pull request #248 from shubheksha/fix/238
Browse files Browse the repository at this point in the history
Align series to 8/16 byte padding to increase addressable space
  • Loading branch information
gouthamve authored Jan 13, 2018
2 parents d0982ac + 618bcea commit 8d373c7
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 37 deletions.
20 changes: 9 additions & 11 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ type BlockMeta struct {

// Information on compactions the block was created from.
Compaction BlockMetaCompaction `json:"compaction"`

// Version of the index format.
Version int `json:"version"`
}

// BlockStats contains stats about contents of a block.
Expand All @@ -176,12 +179,6 @@ const (
flagStd = 1
)

type blockMeta struct {
Version int `json:"version"`

*BlockMeta
}

const indexFilename = "index"
const metaFilename = "meta.json"

Expand All @@ -193,16 +190,16 @@ func readMetaFile(dir string) (*BlockMeta, error) {
if err != nil {
return nil, err
}
var m blockMeta
var m BlockMeta

if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
if m.Version != 1 {
if m.Version != 1 && m.Version != 2 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}

return m.BlockMeta, nil
return &m, nil
}

func writeMetaFile(dir string, meta *BlockMeta) error {
Expand All @@ -219,7 +216,8 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
enc.SetIndent("", "\t")

var merr MultiError
if merr.Add(enc.Encode(&blockMeta{Version: 1, BlockMeta: meta})); merr.Err() != nil {

if merr.Add(enc.Encode(meta)); merr.Err() != nil {
merr.Add(f.Close())
return merr.Err()
}
Expand Down Expand Up @@ -255,7 +253,7 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
if err != nil {
return nil, err
}
ir, err := index.NewFileReader(filepath.Join(dir, "index"))
ir, err := index.NewFileReader(filepath.Join(dir, "index"), meta.Version)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestSetCompactionFailed(t *testing.T) {
func createEmptyBlock(t *testing.T, dir string) *Block {
testutil.Ok(t, os.MkdirAll(dir, 0777))

testutil.Ok(t, writeMetaFile(dir, &BlockMeta{}))
testutil.Ok(t, writeMetaFile(dir, &BlockMeta{Version: 2}))

ir, err := index.NewWriter(filepath.Join(dir, indexFilename))
testutil.Ok(t, err)
Expand Down
1 change: 1 addition & 0 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}

indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename))
meta.Version = indexw.Version
if err != nil {
return errors.Wrap(err, "open index writer")
}
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion Documentation/format/index.md → docs/format/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Strings are referenced by pointing to the beginning of their length field. The s
### Series

The section contains a sequence of series that hold the label set of the series as well as its chunks within the block. The series are sorted lexicographically by their label sets.
The file offset to the beginning of a series serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets.
Each series section is aligned to 16 bytes. The ID for a series is the `offset/16`. This serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets.

```
┌───────────────────────────────────────┐
Expand Down
File renamed without changes.
58 changes: 38 additions & 20 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable)
}

// indexWriter implements the IndexWriter interface for the standard
// Writer implements the IndexWriter interface for the standard
// serialization format.
type Writer struct {
f *os.File
Expand All @@ -122,6 +122,8 @@ type Writer struct {
lastSeries labels.Labels

crc32 hash.Hash

Version int
}

type indexTOC struct {
Expand Down Expand Up @@ -166,6 +168,8 @@ func NewWriter(fn string) (*Writer, error) {
symbols: make(map[string]uint32, 1<<13),
seriesOffsets: make(map[uint64]uint64, 1<<16),
crc32: newCRC32(),

Version: 2,
}
if err := iw.writeMeta(); err != nil {
return nil, err
Expand All @@ -180,12 +184,12 @@ func (w *Writer) write(bufs ...[]byte) error {
if err != nil {
return err
}
// For now the index file must not grow beyond 4GiB. Some of the fixed-sized
// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if w.pos > math.MaxUint32 {
return errors.Errorf("exceeding max size of 4GiB")
if w.pos > 16*math.MaxUint32 {
return errors.Errorf("exceeding max size of 64GiB")
}
}
return nil
Expand Down Expand Up @@ -250,6 +254,7 @@ func (w *Writer) writeMeta() error {
return w.write(w.buf1.get())
}

// AddSeries adds the series one at a time along with its chunks.
func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error {
if err := w.ensureStage(idxStageSeries); err != nil {
return err
Expand All @@ -261,7 +266,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
if _, ok := w.seriesOffsets[ref]; ok {
return errors.Errorf("series with reference %d already added", ref)
}
w.seriesOffsets[ref] = w.pos
w.addPadding(16)
w.seriesOffsets[ref] = w.pos / 16

w.buf2.reset()
w.buf2.putUvarint(len(lset))
Expand Down Expand Up @@ -531,9 +537,11 @@ type Reader struct {
// the block has been unmapped.
symbols map[uint32]string

dec *DecoderV1
dec *Decoder

crc32 hash.Hash32

version int
}

var (
Expand Down Expand Up @@ -563,27 +571,33 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
}

// NewReader returns a new IndexReader on the given byte slice.
func NewReader(b ByteSlice) (*Reader, error) {
return newReader(b, nil)
func NewReader(b ByteSlice, version int) (*Reader, error) {
return newReader(b, nil, version)
}

// NewFileReader returns a new index reader against the given index file.
func NewFileReader(path string) (*Reader, error) {
func NewFileReader(path string, version int) (*Reader, error) {
f, err := fileutil.OpenMmapFile(path)
if err != nil {
return nil, err
}
return newReader(realByteSlice(f.Bytes()), f)
return newReader(realByteSlice(f.Bytes()), f, version)
}

func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
func newReader(b ByteSlice, c io.Closer, version int) (*Reader, error) {
r := &Reader{
b: b,
c: c,
symbols: map[uint32]string{},
labels: map[string]uint32{},
postings: map[labels.Label]uint32{},
crc32: newCRC32(),
version: version,
}

if version != 1 && version != 2 {
return nil, errors.Errorf("unexpected file version %d", version)

}
// Verify magic number.
if b.Len() < 4 {
Expand Down Expand Up @@ -622,7 +636,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
return nil, errors.Wrap(err, "read postings table")
}

r.dec = &DecoderV1{symbols: r.symbols}
r.dec = &Decoder{symbols: r.symbols}

return r, nil
}
Expand Down Expand Up @@ -852,9 +866,13 @@ func (r *Reader) LabelIndices() ([][]string, error) {
return res, nil
}

// Series the series with the given ID and writes its labels and chunks into lbls and chks.
// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
d := r.decbufUvarintAt(int(id))
offset := id
if r.version == 2 {
offset = 16 * id
}
d := r.decbufUvarintAt(int(offset))
if d.err() != nil {
return d.err()
}
Expand Down Expand Up @@ -955,15 +973,15 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
return res, nil
}

// DecoderV1 provides decoding methods for the v1 index file format.
// Decoder provides decoding methods for the v1 and v2 index file format.
//
// It currently does not contain decoding methods for all entry types but can be extended
// by them if there's demand.
type DecoderV1 struct {
type Decoder struct {
symbols map[uint32]string
}

func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) {
func (dec *Decoder) lookupSymbol(o uint32) (string, error) {
s, ok := dec.symbols[o]
if !ok {
return "", errors.Errorf("unknown symbol offset %d", o)
Expand All @@ -973,20 +991,20 @@ func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) {

// SetSymbolTable set the symbol table to be used for lookups when decoding series
// and label indices
func (dec *DecoderV1) SetSymbolTable(t map[uint32]string) {
func (dec *Decoder) SetSymbolTable(t map[uint32]string) {
dec.symbols = t
}

// Postings returns a postings list for b and its number of elements.
func (dec *DecoderV1) Postings(b []byte) (int, Postings, error) {
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
d := decbuf{b: b}
n := d.be32int()
l := d.get()
return n, newBigEndianPostings(l), d.err()
}

// Series decodes a series entry from the given byte slice into lset and chks.
func (dec *DecoderV1) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]

Expand Down
8 changes: 4 additions & 4 deletions index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, iw.Close())

ir, err := NewFileReader(fn)
ir, err := NewFileReader(fn, 1)
testutil.Ok(t, err)
testutil.Ok(t, ir.Close())

Expand All @@ -170,7 +170,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
_, err = f.WriteAt([]byte{0, 0}, 0)
testutil.Ok(t, err)

_, err = NewFileReader(dir)
_, err = NewFileReader(dir, 1)
testutil.NotOk(t, err)
}

Expand Down Expand Up @@ -213,7 +213,7 @@ func TestIndexRW_Postings(t *testing.T) {

testutil.Ok(t, iw.Close())

ir, err := NewFileReader(fn)
ir, err := NewFileReader(fn, 2)
testutil.Ok(t, err)

p, err := ir.Postings("a", "1")
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestPersistence_index_e2e(t *testing.T) {
err = iw.Close()
testutil.Ok(t, err)

ir, err := NewFileReader(filepath.Join(dir, "index"))
ir, err := NewFileReader(filepath.Join(dir, "index"), 2)
testutil.Ok(t, err)

for p := range mi.postings {
Expand Down

0 comments on commit 8d373c7

Please sign in to comment.