Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Pebble checkpoint ingestion #4727

Merged
merged 38 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
418fe33
initial commit
koko1123 Sep 19, 2023
71599fb
correct setup
koko1123 Sep 19, 2023
6a32d16
initial commit
koko1123 Sep 19, 2023
e1c153b
correct setup
koko1123 Sep 19, 2023
518bb02
clarify structure so we can work in parallel if needed
koko1123 Sep 19, 2023
5dbe07a
merge
koko1123 Sep 19, 2023
21fa90f
stub implementation
koko1123 Sep 19, 2023
b5b76eb
add docs
koko1123 Sep 19, 2023
4460fe0
merge with master
koko1123 Sep 25, 2023
09ed228
tests
koko1123 Sep 25, 2023
149c17c
fix tests
koko1123 Sep 25, 2023
bc71659
create error case
koko1123 Sep 25, 2023
ab271ec
adjust pebble tests for context
koko1123 Sep 25, 2023
bfec254
add ctx.Done() check
koko1123 Sep 25, 2023
c8c1811
fix checkpoint opening and worker race condition
koko1123 Sep 26, 2023
93a52dd
make skeletons for remaining tests
koko1123 Sep 27, 2023
7197fbb
fix helper functions
koko1123 Sep 27, 2023
8411e43
remainign test cases
koko1123 Sep 27, 2023
70185be
complete failure test
koko1123 Sep 27, 2023
1648f3f
update batch tests and godocs
koko1123 Sep 28, 2023
7584015
add corrupted file check and debug logs
koko1123 Sep 28, 2023
8474efd
initial changes per feedback
koko1123 Sep 28, 2023
1c99e16
Merge branch 'master' of github.com:onflow/flow-go into amlandeep/peb…
koko1123 Sep 28, 2023
cde601c
changes after merge with master
koko1123 Sep 28, 2023
b869d24
lint
koko1123 Sep 28, 2023
673d08a
cleanup
koko1123 Sep 28, 2023
d52e72b
Merge branch 'master' into amlandeep/pebble-checkpoint-ingestion
peterargue Sep 29, 2023
6746ec3
Merge branch 'master' of github.com:onflow/flow-go into amlandeep/peb…
koko1123 Sep 29, 2023
acf0a3f
Merge branch 'master' into amlandeep/pebble-checkpoint-ingestion
peterargue Sep 29, 2023
e2ca59b
incorporate open DB interface
koko1123 Sep 29, 2023
5b3833c
Merge branch 'amlandeep/pebble-checkpoint-ingestion' of github.com:on…
koko1123 Sep 29, 2023
15ffb8d
lint
koko1123 Sep 29, 2023
199bfe0
lint
koko1123 Sep 29, 2023
0423af7
defer cancel
koko1123 Sep 29, 2023
e32e912
add sentinel error for pre-bootstrapped pebble
koko1123 Sep 29, 2023
94db5b2
add cleanup fn for pebble
koko1123 Sep 29, 2023
47f4ad9
undo cleanup it's closing the pebble instance before usage
koko1123 Sep 29, 2023
0532094
add final rmdir
koko1123 Sep 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions storage/pebble/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package pebble

import (
"context"
"fmt"
"path/filepath"

"github.com/cockroachdb/pebble"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"

"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/ledger/complete/wal"
)

type RegisterBootstrap struct {
checkpointDir string
checkpointFileName string
log zerolog.Logger
db *pebble.DB
leafNodeChan chan *wal.LeafNode
rootHeight uint64
}

// NewRegisterBootstrap creates the bootstrap object for reading checkpoint data and the height tracker in pebble
// This object must be initialized and RegisterBootstrap.IndexCheckpointFile must be run to have the pebble db instance
// in the correct state to initialize a Registers store.
func NewRegisterBootstrap(
db *pebble.DB,
checkpointFile string,
rootHeight uint64,
log zerolog.Logger,
) (*RegisterBootstrap, error) {
// check for pre-populated heights, fail if it is populated
// i.e. the IndexCheckpointFile function has already run for the db in this directory
checkpointDir, checkpointFileName := filepath.Split(checkpointFile)
peterargue marked this conversation as resolved.
Show resolved Hide resolved
_, _, err := db.Get(latestHeightKey())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err == nil {
// key detected, attempt to run bootstrap on corrupt or already bootstrapped data
return nil, fmt.Errorf("found latest key set on badger instance, cannot bootstrap populated DB")
}
err = db.Set(firstHeightKey(), encodedUint64(rootHeight), nil)
peterargue marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("could not index first height key to initialize: %w", err)
}
return &RegisterBootstrap{
checkpointDir: checkpointDir,
checkpointFileName: checkpointFileName,
log: log.With().Str("module", "register_bootstrap").Logger(),
db: db,
leafNodeChan: make(chan *wal.LeafNode, checkpointLeafNodeBufSize),
rootHeight: rootHeight,
}, nil
}

func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error {
b.log.Debug().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes")
batch := b.db.NewBatch()
defer batch.Close()
for _, register := range leafNodes {
payload := register.Payload
key, err := payload.Key()
if err != nil {
return fmt.Errorf("could not get key from register payload: %w", err)
}

registerID, err := convert.LedgerKeyToRegisterID(key)
if err != nil {
return fmt.Errorf("could not get register ID from key: %w", err)
}

encoded := newLookupKey(b.rootHeight, registerID).Bytes()
err = batch.Set(encoded, payload.Value(), nil)
if err != nil {
return fmt.Errorf("failed to set key: %w", err)
}
}
err := batch.Commit(pebble.Sync)
if err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
return nil
}

// indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir
// with wal.OpenAndReadLeafNodesFromCheckpointV6
func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error {
b.log.Info().Msg("started checkpoint index worker")
// collect leaf nodes to batch index until the channel is closed
batch := make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen)
for leafNode := range b.leafNodeChan {
devbugging marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-ctx.Done():
return nil
default:
batch = append(batch, leafNode)
if len(batch) >= pebbleBootstrapRegisterBatchLen {
err := b.batchIndexRegisters(batch)
if err != nil {
return fmt.Errorf("unable to index registers to pebble in batch: %w", err)
}
batch = make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen)
}
}
}
// index the remaining registers if didn't reach a batch length.
err := b.batchIndexRegisters(batch)
peterargue marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("unable to index remaining registers to pebble: %w", err)
}
return nil
}

// IndexCheckpointFile indexes the checkpoint file in the Dir provided
func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error {
g, gCtx := errgroup.WithContext(ctx)
b.log.Info().Msg("indexing checkpoint file for pebble register store")
for i := 0; i < pebbleBootstrapWorkerCount; i++ {
g.Go(func() error {
return b.indexCheckpointFileWorker(gCtx)
})
}
err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log)
if err != nil {
return fmt.Errorf("error reading leaf node: %w", err)
peterargue marked this conversation as resolved.
Show resolved Hide resolved
}
if err = g.Wait(); err != nil {
return fmt.Errorf("failed to index checkpoint file: %w", err)
}
b.log.Info().Msg("checkpoint indexing complete")
err = b.db.Set(latestHeightKey(), encodedUint64(b.rootHeight), nil)
peterargue marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("could not index latest height: %w", err)
}
return nil
}
262 changes: 262 additions & 0 deletions storage/pebble/bootstrap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package pebble

import (
"context"
"encoding/binary"
"fmt"
"io"
"os"
"path"
"testing"

"github.com/cockroachdb/pebble"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/ledger/common/testutils"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage/pebble/registers"
"github.com/onflow/flow-go/utils/unittest"
)

const defaultRegisterValue = byte('v')

func TestRegisterBootstrap_NewBootstrap(t *testing.T) {
t.Parallel()
unittest.RunWithTempDir(t, func(dir string) {
rootHeight := uint64(1)
log := zerolog.New(io.Discard)
cache := pebble.NewCache(1 << 20)
defer cache.Unref()
opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer())
p, err := pebble.Open(dir, opts)
require.NoError(t, err)
// set heights
require.NoError(t, p.Set(firstHeightKey(), encodedUint64(rootHeight), nil))
require.NoError(t, p.Set(latestHeightKey(), encodedUint64(rootHeight), nil))
// errors if FirstHeight or LastHeight are populated
_, err = NewRegisterBootstrap(p, dir, rootHeight, log)
require.ErrorContains(t, err, "cannot bootstrap populated DB")
})
}

func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) {
t.Parallel()
log := zerolog.New(io.Discard)
rootHeight := uint64(10000)
unittest.RunWithTempDir(t, func(dir string) {
tries, registerIDs := simpleTrieWithValidRegisterIDs(t)
fileName := "simple-checkpoint"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint")
checkpointFile := path.Join(dir, fileName)

cache := pebble.NewCache(1 << 20)
opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer())
defer cache.Unref()
pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts)

bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log)
require.NoError(t, err)
err = bootstrap.IndexCheckpointFile(context.Background())
require.NoError(t, err)

// create registers instance and check values
reg, err := NewRegisters(pb)
require.NoError(t, err)

require.Equal(t, reg.LatestHeight(), rootHeight)
require.Equal(t, reg.FirstHeight(), rootHeight)

for _, register := range registerIDs {
val, err := reg.Get(*register, rootHeight)
require.NoError(t, err)
require.Equal(t, val, []byte{defaultRegisterValue})
}

require.NoError(t, pb.Close())
require.NoError(t, os.RemoveAll(dbDir))
peterargue marked this conversation as resolved.
Show resolved Hide resolved
})
}

func TestRegisterBootstrap_IndexCheckpointFile_Empty(t *testing.T) {
t.Parallel()
log := zerolog.New(io.Discard)
rootHeight := uint64(10000)
unittest.RunWithTempDir(t, func(dir string) {
tries := []*trie.MTrie{trie.NewEmptyMTrie()}
fileName := "empty-checkpoint"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint")
checkpointFile := path.Join(dir, fileName)
cache := pebble.NewCache(1 << 20)
opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer())
defer cache.Unref()
pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts)
bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log)
require.NoError(t, err)
err = bootstrap.IndexCheckpointFile(context.Background())
require.NoError(t, err)

// create registers instance and check values
reg, err := NewRegisters(pb)
require.NoError(t, err)

require.Equal(t, reg.LatestHeight(), rootHeight)
require.Equal(t, reg.FirstHeight(), rootHeight)

require.NoError(t, pb.Close())
require.NoError(t, os.RemoveAll(dbDir))
})
}

func TestRegisterBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) {
t.Parallel()
pa1 := testutils.PathByUint8(0)
pa2 := testutils.PathByUint8(1)
rootHeight := uint64(666)
pl1 := testutils.LightPayload8('A', 'A')
pl2 := testutils.LightPayload('B', 'B')
paths := []ledger.Path{pa1, pa2}
payloads := []ledger.Payload{*pl1, *pl2}
emptyTrie := trie.NewEmptyMTrie()
trieWithInvalidEntry, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true)
require.NoError(t, err)
log := zerolog.New(io.Discard)

unittest.RunWithTempDir(t, func(dir string) {
fileName := "invalid-checkpoint"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently([]*trie.MTrie{trieWithInvalidEntry}, dir, fileName, log),
"fail to store checkpoint")
checkpointFile := path.Join(dir, fileName)
cache := pebble.NewCache(1 << 20)
opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer())
defer cache.Unref()

pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts)
bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log)
require.NoError(t, err)
err = bootstrap.IndexCheckpointFile(context.Background())
require.ErrorContains(t, err, "unexpected ledger key format")
require.NoError(t, pb.Close())
require.NoError(t, os.RemoveAll(dbDir))
})

}

func TestRegisterBootstrap_IndexCheckpointFile_CorruptedCheckpointFile(t *testing.T) {
t.Parallel()
rootHeight := uint64(666)
log := zerolog.New(io.Discard)
unittest.RunWithTempDir(t, func(dir string) {
tries, _ := largeTrieWithValidRegisterIDs(t)
checkpointFileName := "large-checkpoint-incomplete"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, checkpointFileName, log), "fail to store checkpoint")
// delete 2nd part of the file (2nd subtrie)
fileToDelete := path.Join(dir, fmt.Sprintf("%v.%03d", checkpointFileName, 2))
err := os.RemoveAll(fileToDelete)
require.NoError(t, err)
cache := pebble.NewCache(1 << 20)
opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer())
defer cache.Unref()
pb, _ := unittest.TempPebbleDBWithOpts(t, opts)
bootstrap, err := NewRegisterBootstrap(pb, checkpointFileName, rootHeight, log)
require.NoError(t, err)
err = bootstrap.IndexCheckpointFile(context.Background())
require.ErrorIs(t, err, os.ErrNotExist)
})
}

func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) {
t.Parallel()
log := zerolog.New(io.Discard)
rootHeight := uint64(10000)
unittest.RunWithTempDir(t, func(dir string) {
tries, registerIDs := largeTrieWithValidRegisterIDs(t)
fileName := "large-checkpoint"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint")
checkpointFile := path.Join(dir, fileName)
cache := pebble.NewCache(1 << 20)
opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer())
defer cache.Unref()
pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts)
bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log)
require.NoError(t, err)
err = bootstrap.IndexCheckpointFile(context.Background())
require.NoError(t, err)

// create registers instance and check values
reg, err := NewRegisters(pb)
require.NoError(t, err)

require.Equal(t, reg.LatestHeight(), rootHeight)
require.Equal(t, reg.FirstHeight(), rootHeight)

for _, register := range registerIDs {
val, err := reg.Get(*register, rootHeight)
require.NoError(t, err)
require.Equal(t, val, []byte{defaultRegisterValue})
}

require.NoError(t, pb.Close())
require.NoError(t, os.RemoveAll(dbDir))
})

}

func simpleTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.RegisterID) {
return trieWithValidRegisterIDs(t, 2)
}

func largeTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.RegisterID) {
// large enough trie so every worker should have something to index
largeTrieSize := 2 * pebbleBootstrapRegisterBatchLen * pebbleBootstrapWorkerCount
return trieWithValidRegisterIDs(t, uint16(largeTrieSize))
}

func trieWithValidRegisterIDs(t *testing.T, n uint16) ([]*trie.MTrie, []*flow.RegisterID) {
emptyTrie := trie.NewEmptyMTrie()
resultRegisterIDs := make([]*flow.RegisterID, 0, n)
paths := randomRegisterPaths(n)
payloads := randomRegisterPayloads(n)
for _, payload := range payloads {
key, err := payload.Key()
require.NoError(t, err)
regID, err := convert.LedgerKeyToRegisterID(key)
require.NoError(t, err)
resultRegisterIDs = append(resultRegisterIDs, &regID)
}
populatedTrie, depth, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true)
// make sure it has at least 1 leaf node
require.GreaterOrEqual(t, depth, uint16(1))
require.NoError(t, err)
resultTries := []*trie.MTrie{emptyTrie, populatedTrie}
return resultTries, resultRegisterIDs
}

func randomRegisterPayloads(n uint16) []ledger.Payload {
p := make([]ledger.Payload, 0, n)
for i := uint16(0); i < n; i++ {
o := make([]byte, 0, 8)
o = binary.BigEndian.AppendUint16(o, n)
k := ledger.Key{KeyParts: []ledger.KeyPart{
{Type: convert.KeyPartOwner, Value: o},
{Type: convert.KeyPartKey, Value: o},
}}
// values are always 'v' for ease of testing/checking
v := ledger.Value{defaultRegisterValue}
pl := ledger.NewPayload(k, v)
p = append(p, *pl)
}
return p
}

func randomRegisterPaths(n uint16) []ledger.Path {
p := make([]ledger.Path, 0, n)
for i := uint16(0); i < n; i++ {
p = append(p, testutils.PathByUint16(i))
}
return p
}
Loading