diff --git a/pkg/storage/chunk/client/local/boltdb_index_client.go b/pkg/storage/chunk/client/local/boltdb_index_client.go index c65a498596826..67dc97d6df028 100644 --- a/pkg/storage/chunk/client/local/boltdb_index_client.go +++ b/pkg/storage/chunk/client/local/boltdb_index_client.go @@ -258,69 +258,104 @@ func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, bucketName } func (b *BoltIndexClient) QueryWithCursor(_ context.Context, c *bbolt.Cursor, query index.Query, callback index.QueryPagesCallback) error { - var start []byte - if len(query.RangeValuePrefix) > 0 { - start = []byte(query.HashValue + separator + string(query.RangeValuePrefix)) - } else if len(query.RangeValueStart) > 0 { - start = []byte(query.HashValue + separator + string(query.RangeValueStart)) - } else { - start = []byte(query.HashValue + separator) - } + batch := batchPool.Get().(*cursorBatch) + defer batchPool.Put(batch) + + batch.reset(c, &query) + callback(query, batch) + return nil +} - rowPrefix := []byte(query.HashValue + separator) +var batchPool = sync.Pool{ + New: func() interface{} { + return &cursorBatch{ + start: bytes.NewBuffer(make([]byte, 0, 1024)), + rowPrefix: bytes.NewBuffer(make([]byte, 0, 1024)), + } + }, +} - // sync.WaitGroup is needed to wait for the caller to finish processing all the index entries being streamed - wg := sync.WaitGroup{} - batch := newReadBatch() - defer func() { - batch.done() - wg.Wait() - }() +type cursorBatch struct { + cursor *bbolt.Cursor + query *index.Query + start *bytes.Buffer + rowPrefix *bytes.Buffer + seeked bool - callbackDone := false + currRangeValue []byte + currValue []byte +} + +func (c *cursorBatch) Iterator() index.ReadBatchIterator { + return c +} + +func (c *cursorBatch) nextItem() ([]byte, []byte) { + if !c.seeked { + if len(c.query.RangeValuePrefix) > 0 { + c.start.WriteString(c.query.HashValue) + c.start.WriteString(separator) + c.start.Write(c.query.RangeValuePrefix) + } else if len(c.query.RangeValueStart) > 0 { + c.start.WriteString(c.query.HashValue) + c.start.WriteString(separator) + c.start.Write(c.query.RangeValueStart) + } else { + c.start.WriteString(c.query.HashValue) + c.start.WriteString(separator) + } + c.rowPrefix.WriteString(c.query.HashValue) + c.rowPrefix.WriteString(separator) + c.seeked = true + return c.cursor.Seek(c.start.Bytes()) + } + return c.cursor.Next() +} - for k, v := c.Seek(start); k != nil; k, v = c.Next() { - if !bytes.HasPrefix(k, rowPrefix) { +func (c *cursorBatch) Next() bool { + for k, v := c.nextItem(); k != nil; k, v = c.nextItem() { + if !bytes.HasPrefix(k, c.rowPrefix.Bytes()) { break } - if len(query.RangeValuePrefix) > 0 && !bytes.HasPrefix(k, start) { + if len(c.query.RangeValuePrefix) > 0 && !bytes.HasPrefix(k, c.start.Bytes()) { break } - if len(query.ValueEqual) > 0 && !bytes.Equal(v, query.ValueEqual) { + if len(c.query.ValueEqual) > 0 && !bytes.Equal(v, c.query.ValueEqual) { continue } - // we need to do callback only once to pass the batch iterator - if !callbackDone { - wg.Add(1) - // do the callback in a goroutine to stream back the index entries - go func() { - // wait for callback to finish processing the batch and return - defer wg.Done() - callback(query, batch) - }() - callbackDone = true - } - // make a copy since k, v are only valid for the life of the transaction. // See: https://godoc.org/github.com/boltdb/bolt#Cursor.Seek - rangeValue := make([]byte, len(k)-len(rowPrefix)) - copy(rangeValue, k[len(rowPrefix):]) + rangeValue := make([]byte, len(k)-c.rowPrefix.Len()) + copy(rangeValue, k[c.rowPrefix.Len():]) value := make([]byte, len(v)) copy(value, v) - err := batch.send(singleResponse{ - rangeValue: rangeValue, - value: value, - }) - if err != nil { - return errors.Wrap(err, "failed to send row while processing boltdb index query") - } + c.currRangeValue = rangeValue + c.currValue = value + return true } + return false +} - return nil +func (c *cursorBatch) RangeValue() []byte { + return c.currRangeValue +} + +func (c *cursorBatch) Value() []byte { + return c.currValue +} + +func (c *cursorBatch) reset(cur *bbolt.Cursor, q *index.Query) { + c.currRangeValue = nil + c.currValue = nil + c.seeked = false + c.cursor = cur + c.query = q + c.rowPrefix.Reset() + c.start.Reset() } type TableWrites struct { @@ -359,51 +394,6 @@ func (b *BoltWriteBatch) Add(tableName, hashValue string, rangeValue []byte, val writes.puts[key] = value } -type singleResponse struct { - rangeValue []byte - value []byte -} - -type readBatch struct { - respChan chan singleResponse - curr singleResponse -} - -func newReadBatch() *readBatch { - return &readBatch{respChan: make(chan singleResponse)} -} - -func (r *readBatch) Iterator() index.ReadBatchIterator { - return r -} - -func (r *readBatch) Next() bool { - var ok bool - r.curr, ok = <-r.respChan - return ok -} - -func (r *readBatch) RangeValue() []byte { - return r.curr.rangeValue -} - -func (r *readBatch) Value() []byte { - return r.curr.value -} - -func (r *readBatch) done() { - close(r.respChan) -} - -func (r *readBatch) send(resp singleResponse) error { - select { - case r.respChan <- resp: - return nil - case <-time.After(10 * time.Second): - return errors.New("timed out sending response") - } -} - // Open the database. // Set Timeout to avoid obtaining file lock wait indefinitely. func OpenBoltdbFile(path string) (*bbolt.DB, error) { diff --git a/pkg/storage/chunk/client/local/boltdb_index_client_test.go b/pkg/storage/chunk/client/local/boltdb_index_client_test.go index 82254a44c1757..6e23bdbbaade3 100644 --- a/pkg/storage/chunk/client/local/boltdb_index_client_test.go +++ b/pkg/storage/chunk/client/local/boltdb_index_client_test.go @@ -259,3 +259,50 @@ func TestBoltDB_Writes(t *testing.T) { }) } } + +func Benchmark_Query(b *testing.B) { + tableName := "test" + dirname := b.TempDir() + + indexClient, err := NewBoltDBIndexClient(BoltDBConfig{ + Directory: dirname, + }) + require.NoError(b, err) + + tableClient, err := NewTableClient(dirname) + require.NoError(b, err) + + err = tableClient.CreateTable(context.Background(), config.TableDesc{ + Name: tableName, + }) + require.NoError(b, err) + + batch := indexClient.NewWriteBatch() + batch.Add(tableName, fmt.Sprintf("hash%s", "test"), []byte(fmt.Sprintf("range%s", "value")), nil) + + err = indexClient.BatchWrite(context.Background(), batch) + require.NoError(b, err) + + // try to create the same file which is already existing + err = tableClient.CreateTable(context.Background(), config.TableDesc{ + Name: tableName, + }) + require.NoError(b, err) + + // make sure file content is not modified + entry := index.Query{ + TableName: tableName, + HashValue: fmt.Sprintf("hash%s", "test"), + } + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + err = indexClient.query(context.Background(), entry, func(_ index.Query, read index.ReadBatchResult) bool { + iter := read.Iterator() + for iter.Next() { + } + return true + }) + require.NoError(b, err) + } +}