From 418fe33038392c6c2ff471daa445539614bd900f Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Tue, 19 Sep 2023 09:42:45 -0400 Subject: [PATCH 01/31] initial commit --- storage/pebble/bootstrap.go | 1 + storage/pebble/bootstrap_test.go | 54 ++++++++++++++++++++++++++++++++ storage/pebble/lookup.go | 15 +++++++++ 3 files changed, 70 insertions(+) create mode 100644 storage/pebble/bootstrap.go create mode 100644 storage/pebble/bootstrap_test.go diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go new file mode 100644 index 00000000000..c86d0bc9ec3 --- /dev/null +++ b/storage/pebble/bootstrap.go @@ -0,0 +1 @@ +package pebble diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go new file mode 100644 index 00000000000..a540936e4e5 --- /dev/null +++ b/storage/pebble/bootstrap_test.go @@ -0,0 +1,54 @@ +package pebble + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/onflow/flow-go/ledger/complete/wal" +) + +type Bootstrap struct { + db *pebble.DB + done chan struct{} +} + +func NewBootstrap(db pebble.DB) *Bootstrap { + return &Bootstrap{ + db: db, + done: make(chan struct{}), + } +} + +func (b *Bootstrap) batchIndexRegisters(height uint64, registers []*wal.LeafNode) error { + // collect entries + batch := b.db.NewBatch() + defer batch.Close() + for _, register := range registers { + payload := register.Payload + key, err := payload.Key() + if err != nil { + return fmt.Errorf("could not get key from register payload: %w", err) + } + + registerID, err := registerIDFromPayloadKey(key) + if err != nil { + return fmt.Errorf("could not get register ID from key: %w", err) + } + + encoded := newLookupKey(height, registerID).Bytes() + err = batch.Set(encoded, payload.Value(), nil) + if err != nil { + return fmt.Errorf("failed to set key: %w", err) + } + } + // batch insert to db + err := batch.Commit(pebble.Sync) + if err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + return nil +} + +func IndexCheckpointFile(checkpointDir string) chan struct{} { + +} diff --git a/storage/pebble/lookup.go b/storage/pebble/lookup.go index 06b82c86edc..98cc9b4adda 100644 --- a/storage/pebble/lookup.go +++ b/storage/pebble/lookup.go @@ -5,6 +5,8 @@ import ( "encoding/binary" "fmt" + "github.com/onflow/flow-go/engine/execution/state" + "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/storage/pebble/registers" ) @@ -114,6 +116,19 @@ func lookupKeyToRegisterID(lookupKey []byte) (uint64, flow.RegisterID, error) { return height, regID, nil } +func registerIDFromPayloadKey(key ledger.Key) (flow.RegisterID, error) { + if len(key.KeyParts) != 2 || + key.KeyParts[0].Type != state.KeyPartOwner || + key.KeyParts[1].Type != state.KeyPartKey { + return flow.RegisterID{}, fmt.Errorf("key not in expected format: %s", key.String()) + } + + return flow.RegisterID{ + Owner: string(key.KeyParts[0].Value), + Key: string(key.KeyParts[1].Value), + }, nil +} + // Bytes returns the encoded lookup key. func (h lookupKey) Bytes() []byte { return h.encoded From 71599fb2d766893f1934ff569c4c7ad27f4fe987 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Tue, 19 Sep 2023 09:44:35 -0400 Subject: [PATCH 02/31] correct setup --- storage/pebble/bootstrap.go | 53 ++++++++++++++++++++++++++++++++ storage/pebble/bootstrap_test.go | 53 -------------------------------- 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index c86d0bc9ec3..a540936e4e5 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -1 +1,54 @@ package pebble + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/onflow/flow-go/ledger/complete/wal" +) + +type Bootstrap struct { + db *pebble.DB + done chan struct{} +} + +func NewBootstrap(db pebble.DB) *Bootstrap { + return &Bootstrap{ + db: db, + done: make(chan struct{}), + } +} + +func (b *Bootstrap) batchIndexRegisters(height uint64, registers []*wal.LeafNode) error { + // collect entries + batch := b.db.NewBatch() + defer batch.Close() + for _, register := range registers { + payload := register.Payload + key, err := payload.Key() + if err != nil { + return fmt.Errorf("could not get key from register payload: %w", err) + } + + registerID, err := registerIDFromPayloadKey(key) + if err != nil { + return fmt.Errorf("could not get register ID from key: %w", err) + } + + encoded := newLookupKey(height, registerID).Bytes() + err = batch.Set(encoded, payload.Value(), nil) + if err != nil { + return fmt.Errorf("failed to set key: %w", err) + } + } + // batch insert to db + err := batch.Commit(pebble.Sync) + if err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + return nil +} + +func IndexCheckpointFile(checkpointDir string) chan struct{} { + +} diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index a540936e4e5..c86d0bc9ec3 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -1,54 +1 @@ package pebble - -import ( - "fmt" - - "github.com/cockroachdb/pebble" - "github.com/onflow/flow-go/ledger/complete/wal" -) - -type Bootstrap struct { - db *pebble.DB - done chan struct{} -} - -func NewBootstrap(db pebble.DB) *Bootstrap { - return &Bootstrap{ - db: db, - done: make(chan struct{}), - } -} - -func (b *Bootstrap) batchIndexRegisters(height uint64, registers []*wal.LeafNode) error { - // collect entries - batch := b.db.NewBatch() - defer batch.Close() - for _, register := range registers { - payload := register.Payload - key, err := payload.Key() - if err != nil { - return fmt.Errorf("could not get key from register payload: %w", err) - } - - registerID, err := registerIDFromPayloadKey(key) - if err != nil { - return fmt.Errorf("could not get register ID from key: %w", err) - } - - encoded := newLookupKey(height, registerID).Bytes() - err = batch.Set(encoded, payload.Value(), nil) - if err != nil { - return fmt.Errorf("failed to set key: %w", err) - } - } - // batch insert to db - err := batch.Commit(pebble.Sync) - if err != nil { - return fmt.Errorf("failed to commit batch: %w", err) - } - return nil -} - -func IndexCheckpointFile(checkpointDir string) chan struct{} { - -} From 6a32d16e0e00fc9e5d9ae759772ecbf0f64dc836 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Tue, 19 Sep 2023 09:42:45 -0400 Subject: [PATCH 03/31] initial commit --- storage/pebble/bootstrap.go | 1 + storage/pebble/bootstrap_test.go | 54 ++++++++++++++++++++++++++++++++ storage/pebble/lookup.go | 15 +++++++++ 3 files changed, 70 insertions(+) create mode 100644 storage/pebble/bootstrap.go create mode 100644 storage/pebble/bootstrap_test.go diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go new file mode 100644 index 00000000000..c86d0bc9ec3 --- /dev/null +++ b/storage/pebble/bootstrap.go @@ -0,0 +1 @@ +package pebble diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go new file mode 100644 index 00000000000..a540936e4e5 --- /dev/null +++ b/storage/pebble/bootstrap_test.go @@ -0,0 +1,54 @@ +package pebble + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/onflow/flow-go/ledger/complete/wal" +) + +type Bootstrap struct { + db *pebble.DB + done chan struct{} +} + +func NewBootstrap(db pebble.DB) *Bootstrap { + return &Bootstrap{ + db: db, + done: make(chan struct{}), + } +} + +func (b *Bootstrap) batchIndexRegisters(height uint64, registers []*wal.LeafNode) error { + // collect entries + batch := b.db.NewBatch() + defer batch.Close() + for _, register := range registers { + payload := register.Payload + key, err := payload.Key() + if err != nil { + return fmt.Errorf("could not get key from register payload: %w", err) + } + + registerID, err := registerIDFromPayloadKey(key) + if err != nil { + return fmt.Errorf("could not get register ID from key: %w", err) + } + + encoded := newLookupKey(height, registerID).Bytes() + err = batch.Set(encoded, payload.Value(), nil) + if err != nil { + return fmt.Errorf("failed to set key: %w", err) + } + } + // batch insert to db + err := batch.Commit(pebble.Sync) + if err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + return nil +} + +func IndexCheckpointFile(checkpointDir string) chan struct{} { + +} diff --git a/storage/pebble/lookup.go b/storage/pebble/lookup.go index 37f62b75a0e..10d42bb8f1e 100644 --- a/storage/pebble/lookup.go +++ b/storage/pebble/lookup.go @@ -5,6 +5,8 @@ import ( "encoding/binary" "fmt" + "github.com/onflow/flow-go/engine/execution/state" + "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/storage/pebble/registers" ) @@ -121,6 +123,19 @@ func lookupKeyToRegisterID(lookupKey []byte) (uint64, flow.RegisterID, error) { return height, regID, nil } +func registerIDFromPayloadKey(key ledger.Key) (flow.RegisterID, error) { + if len(key.KeyParts) != 2 || + key.KeyParts[0].Type != state.KeyPartOwner || + key.KeyParts[1].Type != state.KeyPartKey { + return flow.RegisterID{}, fmt.Errorf("key not in expected format: %s", key.String()) + } + + return flow.RegisterID{ + Owner: string(key.KeyParts[0].Value), + Key: string(key.KeyParts[1].Value), + }, nil +} + // Bytes returns the encoded lookup key. func (h lookupKey) Bytes() []byte { return h.encoded From e1c153b18375181db15cf73728eeafc8705f298f Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Tue, 19 Sep 2023 09:44:35 -0400 Subject: [PATCH 04/31] correct setup --- storage/pebble/bootstrap.go | 53 ++++++++++++++++++++++++++++++++ storage/pebble/bootstrap_test.go | 53 -------------------------------- 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index c86d0bc9ec3..a540936e4e5 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -1 +1,54 @@ package pebble + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/onflow/flow-go/ledger/complete/wal" +) + +type Bootstrap struct { + db *pebble.DB + done chan struct{} +} + +func NewBootstrap(db pebble.DB) *Bootstrap { + return &Bootstrap{ + db: db, + done: make(chan struct{}), + } +} + +func (b *Bootstrap) batchIndexRegisters(height uint64, registers []*wal.LeafNode) error { + // collect entries + batch := b.db.NewBatch() + defer batch.Close() + for _, register := range registers { + payload := register.Payload + key, err := payload.Key() + if err != nil { + return fmt.Errorf("could not get key from register payload: %w", err) + } + + registerID, err := registerIDFromPayloadKey(key) + if err != nil { + return fmt.Errorf("could not get register ID from key: %w", err) + } + + encoded := newLookupKey(height, registerID).Bytes() + err = batch.Set(encoded, payload.Value(), nil) + if err != nil { + return fmt.Errorf("failed to set key: %w", err) + } + } + // batch insert to db + err := batch.Commit(pebble.Sync) + if err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + return nil +} + +func IndexCheckpointFile(checkpointDir string) chan struct{} { + +} diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index a540936e4e5..c86d0bc9ec3 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -1,54 +1 @@ package pebble - -import ( - "fmt" - - "github.com/cockroachdb/pebble" - "github.com/onflow/flow-go/ledger/complete/wal" -) - -type Bootstrap struct { - db *pebble.DB - done chan struct{} -} - -func NewBootstrap(db pebble.DB) *Bootstrap { - return &Bootstrap{ - db: db, - done: make(chan struct{}), - } -} - -func (b *Bootstrap) batchIndexRegisters(height uint64, registers []*wal.LeafNode) error { - // collect entries - batch := b.db.NewBatch() - defer batch.Close() - for _, register := range registers { - payload := register.Payload - key, err := payload.Key() - if err != nil { - return fmt.Errorf("could not get key from register payload: %w", err) - } - - registerID, err := registerIDFromPayloadKey(key) - if err != nil { - return fmt.Errorf("could not get register ID from key: %w", err) - } - - encoded := newLookupKey(height, registerID).Bytes() - err = batch.Set(encoded, payload.Value(), nil) - if err != nil { - return fmt.Errorf("failed to set key: %w", err) - } - } - // batch insert to db - err := batch.Commit(pebble.Sync) - if err != nil { - return fmt.Errorf("failed to commit batch: %w", err) - } - return nil -} - -func IndexCheckpointFile(checkpointDir string) chan struct{} { - -} From 518bb02f6677d7fe3c62c018c718fc2837d14ec3 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Tue, 19 Sep 2023 15:27:32 -0400 Subject: [PATCH 05/31] clarify structure so we can work in parallel if needed --- storage/pebble/bootstrap.go | 43 +++++++++++++++++++++++++++----- storage/pebble/lookup.go | 4 +-- storage/pebble/registers.go | 6 ++--- storage/pebble/registers_test.go | 4 +-- 4 files changed, 44 insertions(+), 13 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index a540936e4e5..6c5517c1278 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -8,14 +8,23 @@ import ( ) type Bootstrap struct { - db *pebble.DB - done chan struct{} + db *pebble.DB + rootHeight uint64 + done chan struct{} } -func NewBootstrap(db pebble.DB) *Bootstrap { +func NewBootstrap(db *pebble.DB, rootHeight uint64) *Bootstrap { + // check for pre-populated heights, fail if it is populated + // i.e. the IndexCheckpointFile function has already run for the db in this directory + _, _, err := db.Get(latestHeightKey()) + if err == nil { + // key detected, attempt to run bootstrap on corrupt or already bootstrapped data + panic("found latest key set on badger instance, cannot bootstrap populated DB") + } return &Bootstrap{ - db: db, - done: make(chan struct{}), + db: db, + done: make(chan struct{}), + rootHeight: rootHeight, } } @@ -49,6 +58,28 @@ func (b *Bootstrap) batchIndexRegisters(height uint64, registers []*wal.LeafNode return nil } -func IndexCheckpointFile(checkpointDir string) chan struct{} { +// IndexCheckpointFile indexes the checkpoint file in the Dir provided and returns a channel that closes when done +func (b *Bootstrap) IndexCheckpointFile(checkpointDir string) <-chan struct{} { + // index checkpoint + + // update heights atomically in case one gets populated and the other doesn't, + // leaving it in a corrupted state + bat := b.db.NewBatch() + err := bat.Set(firstHeightKey(), EncodedUint64(b.rootHeight), nil) + if err != nil { + + } + err = bat.Set(latestHeightKey(), EncodedUint64(b.rootHeight), nil) + if err != nil { + + } + err = bat.Commit(pebble.Sync) + if err != nil { + + } +} + +// indexCheckpointFileWorker asynchronously indexes register entries from wal.OpenAndReadLeafNodesFromCheckpointV6 +func (b *Bootstrap) indexCheckpointFileWorker() <-chan bool { } diff --git a/storage/pebble/lookup.go b/storage/pebble/lookup.go index 10d42bb8f1e..6cf2749967c 100644 --- a/storage/pebble/lookup.go +++ b/storage/pebble/lookup.go @@ -27,14 +27,14 @@ func createHeightKey(identifier byte) []byte { // LatestHeightKey is a special case of a lookupKey // with keyLatestBlockHeight as key, no owner and a placeholder height of 0. // This is to ensure SeekPrefixGE in pebble does not break -func LatestHeightKey() []byte { +func latestHeightKey() []byte { return createHeightKey(codeLatestBlockHeight) } // FirstHeightKey is a special case of a lookupKey // with keyFirstBlockHeight as key, no owner and a placeholder height of 0. // This is to ensure SeekPrefixGE in pebble does not break -func FirstHeightKey() []byte { +func firstHeightKey() []byte { return createHeightKey(codeFirstBlockHeight) } diff --git a/storage/pebble/registers.go b/storage/pebble/registers.go index eda74b52e2c..62496afc671 100644 --- a/storage/pebble/registers.go +++ b/storage/pebble/registers.go @@ -109,7 +109,7 @@ func (s *Registers) Store( } } // increment height and commit - err := batch.Set(LatestHeightKey(), EncodedUint64(height), nil) + err := batch.Set(latestHeightKey(), EncodedUint64(height), nil) if err != nil { return fmt.Errorf("failed to update latest height %d", height) } @@ -124,12 +124,12 @@ func (s *Registers) Store( // LatestHeight Gets the latest height of complete registers available func (s *Registers) LatestHeight() (uint64, error) { - return s.heightLookup(LatestHeightKey()) + return s.heightLookup(latestHeightKey()) } // FirstHeight first indexed height found in the store, typically root block for the spork func (s *Registers) FirstHeight() (uint64, error) { - return s.heightLookup(FirstHeightKey()) + return s.heightLookup(firstHeightKey()) } func (s *Registers) heightLookup(key []byte) (uint64, error) { diff --git a/storage/pebble/registers_test.go b/storage/pebble/registers_test.go index f625ffcbcf3..a4f4d9d7554 100644 --- a/storage/pebble/registers_test.go +++ b/storage/pebble/registers_test.go @@ -265,8 +265,8 @@ func RunWithRegistersStorageAtInitialHeights(tb testing.TB, first uint64, latest opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) unittest.RunWithConfiguredPebbleInstance(tb, opts, func(p *pebble.DB) { // insert initial heights to pebble - require.NoError(tb, p.Set(FirstHeightKey(), EncodedUint64(first), nil)) - require.NoError(tb, p.Set(LatestHeightKey(), EncodedUint64(latest), nil)) + require.NoError(tb, p.Set(firstHeightKey(), EncodedUint64(first), nil)) + require.NoError(tb, p.Set(latestHeightKey(), EncodedUint64(latest), nil)) r, err := NewRegisters(p) require.NoError(tb, err) f(r) From 21fa90fce1c0b39c62359bc8be6e35aaabca681e Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Tue, 19 Sep 2023 16:44:41 -0400 Subject: [PATCH 06/31] stub implementation --- storage/pebble/bootstrap.go | 68 +++++++++++++++++++++----------- storage/pebble/bootstrap_test.go | 10 +++++ 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 6c5517c1278..1b50e686dee 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -8,12 +8,13 @@ import ( ) type Bootstrap struct { - db *pebble.DB - rootHeight uint64 - done chan struct{} + checkpointDir string + db *pebble.DB + done chan struct{} + rootHeight uint64 } -func NewBootstrap(db *pebble.DB, rootHeight uint64) *Bootstrap { +func NewBootstrap(db *pebble.DB, checkpointDir string, rootHeight uint64) *Bootstrap { // check for pre-populated heights, fail if it is populated // i.e. the IndexCheckpointFile function has already run for the db in this directory _, _, err := db.Get(latestHeightKey()) @@ -22,9 +23,10 @@ func NewBootstrap(db *pebble.DB, rootHeight uint64) *Bootstrap { panic("found latest key set on badger instance, cannot bootstrap populated DB") } return &Bootstrap{ - db: db, - done: make(chan struct{}), - rootHeight: rootHeight, + checkpointDir: checkpointDir, + db: db, + done: make(chan struct{}), + rootHeight: rootHeight, } } @@ -59,27 +61,49 @@ func (b *Bootstrap) batchIndexRegisters(height uint64, registers []*wal.LeafNode } // IndexCheckpointFile indexes the checkpoint file in the Dir provided and returns a channel that closes when done -func (b *Bootstrap) IndexCheckpointFile(checkpointDir string) <-chan struct{} { +func (b *Bootstrap) IndexCheckpointFile() <-chan error { + c := make(chan error, 1) // index checkpoint - // update heights atomically in case one gets populated and the other doesn't, - // leaving it in a corrupted state - bat := b.db.NewBatch() - err := bat.Set(firstHeightKey(), EncodedUint64(b.rootHeight), nil) - if err != nil { - - } - err = bat.Set(latestHeightKey(), EncodedUint64(b.rootHeight), nil) - if err != nil { + go func() { + bat := b.db.NewBatch() - } - err = bat.Commit(pebble.Sync) - if err != nil { + defer func() { + close(c) + err := bat.Close() + if err != nil { - } + } + }() + // index checkpoint file async + doneIndex := b.indexCheckpointFileWorker() + err := <-doneIndex + if err != nil { + c <- fmt.Errorf("failed to index checkpoint files: %w", err) + return + } + // update heights atomically to prevent one getting populated without the other + // leaving it in a corrupted state + err = bat.Set(firstHeightKey(), EncodedUint64(b.rootHeight), nil) + if err != nil { + c <- fmt.Errorf("failed to set first height %w", err) + return + } + err = bat.Set(latestHeightKey(), EncodedUint64(b.rootHeight), nil) + if err != nil { + c <- fmt.Errorf("failed to set latest height %w", err) + return + } + err = bat.Commit(pebble.Sync) + if err != nil { + c <- fmt.Errorf("failed to commit height updates %w", err) + return + } + }() + return c } // indexCheckpointFileWorker asynchronously indexes register entries from wal.OpenAndReadLeafNodesFromCheckpointV6 -func (b *Bootstrap) indexCheckpointFileWorker() <-chan bool { +func (b *Bootstrap) indexCheckpointFileWorker() <-chan error { } diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index c86d0bc9ec3..01f8f0b2c1b 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -1 +1,11 @@ package pebble + +import "testing" + +func TestBootstrap_IndexCheckpointFile(t *testing.T) { + +} + +func TestBootstrap_Initialize(t *testing.T) { + +} From b5b76ebfc7fc2d65bd35f79db1645e67283a60dd Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Tue, 19 Sep 2023 16:45:50 -0400 Subject: [PATCH 07/31] add docs --- storage/pebble/bootstrap.go | 13 ++++++------- storage/pebble/bootstrap_test.go | 29 ++++++++++++++++++++++++++--- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 1b50e686dee..787c0035cb2 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -14,20 +14,20 @@ type Bootstrap struct { rootHeight uint64 } -func NewBootstrap(db *pebble.DB, checkpointDir string, rootHeight uint64) *Bootstrap { +func NewBootstrap(db *pebble.DB, checkpointDir string, rootHeight uint64) (*Bootstrap, error) { // check for pre-populated heights, fail if it is populated // i.e. the IndexCheckpointFile function has already run for the db in this directory _, _, err := db.Get(latestHeightKey()) if err == nil { // key detected, attempt to run bootstrap on corrupt or already bootstrapped data - panic("found latest key set on badger instance, cannot bootstrap populated DB") + return nil, fmt.Errorf("found latest key set on badger instance, cannot bootstrap populated DB") } return &Bootstrap{ checkpointDir: checkpointDir, db: db, done: make(chan struct{}), rootHeight: rootHeight, - } + }, nil } func (b *Bootstrap) batchIndexRegisters(height uint64, registers []*wal.LeafNode) error { @@ -63,8 +63,6 @@ func (b *Bootstrap) batchIndexRegisters(height uint64, registers []*wal.LeafNode // IndexCheckpointFile indexes the checkpoint file in the Dir provided and returns a channel that closes when done func (b *Bootstrap) IndexCheckpointFile() <-chan error { c := make(chan error, 1) - // index checkpoint - go func() { bat := b.db.NewBatch() @@ -103,7 +101,8 @@ func (b *Bootstrap) IndexCheckpointFile() <-chan error { return c } -// indexCheckpointFileWorker asynchronously indexes register entries from wal.OpenAndReadLeafNodesFromCheckpointV6 +// indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir +// with wal.OpenAndReadLeafNodesFromCheckpointV6 func (b *Bootstrap) indexCheckpointFileWorker() <-chan error { - + panic("not implemented") } diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 01f8f0b2c1b..3b2d90ff54d 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -1,11 +1,34 @@ package pebble -import "testing" +import ( + "path" + "testing" -func TestBootstrap_IndexCheckpointFile(t *testing.T) { + "github.com/cockroachdb/pebble" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage/pebble/registers" + "github.com/onflow/flow-go/utils/unittest" +) +func TestBootstrap_NewBootstrap(t *testing.T) { + sampleDir := path.Join(unittest.TempDir(t), "checkpoint.checkpoint") + rootHeight := uint64(1) + cache := pebble.NewCache(1 << 20) + opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) + unittest.RunWithConfiguredPebbleInstance(t, opts, func(p *pebble.DB) { + // no issues when pebble instance is blank + _, err := NewBootstrap(p, sampleDir, rootHeight) + require.NoError(t, err) + // set heights + require.NoError(t, p.Set(firstHeightKey(), EncodedUint64(rootHeight), nil)) + require.NoError(t, p.Set(latestHeightKey(), EncodedUint64(rootHeight), nil)) + // errors if FirstHeight or LastHeight are populated + _, err = NewBootstrap(p, sampleDir, rootHeight) + require.ErrorContains(t, err, "cannot bootstrap populated DB") + }) } -func TestBootstrap_Initialize(t *testing.T) { +func TestBootstrap_IndexCheckpointFile(t *testing.T) { } From 09ed22882d3ddc4527fd5c883410fb77a419891e Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 24 Sep 2023 22:14:13 -0400 Subject: [PATCH 08/31] tests --- storage/pebble/bootstrap.go | 54 ++++++------- storage/pebble/bootstrap_test.go | 134 ++++++++++++++++++++++++++++++- 2 files changed, 157 insertions(+), 31 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index d69511cb57e..31229955935 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -40,7 +40,6 @@ func NewBootstrap(db *pebble.DB, checkpointFile string, rootHeight uint64, log z } func (b *Bootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error { - // collect entries batch := b.db.NewBatch() defer batch.Close() for _, register := range leafNodes { @@ -61,7 +60,6 @@ func (b *Bootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error { return fmt.Errorf("failed to set key: %w", err) } } - // batch insert to db err := batch.Commit(pebble.Sync) if err != nil { return fmt.Errorf("failed to commit batch: %w", err) @@ -69,10 +67,31 @@ func (b *Bootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error { return nil } -// IndexCheckpointFile indexes the checkpoint file in the Dir provided and returns a channel that closes when done +// indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir +// with wal.OpenAndReadLeafNodesFromCheckpointV6 +func (b *Bootstrap) indexCheckpointFileWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + select { + case <-ctx.Done(): + return + default: + } + // collect leaf nodes to batch index until the channel is closed + batch := make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) + for leafNode := range b.leafNodeChan { + batch = append(batch, leafNode) + if len(batch) >= pebbleBootstrapRegisterBatchLen { + err := b.batchIndexRegisters(batch) + if err != nil { + ctx.Throw(fmt.Errorf("unable to index registers to pebble: %w", err)) + } + } + } +} + +// IndexCheckpointFile indexes the checkpoint file in the Dir provided as a component func (b *Bootstrap) IndexCheckpointFile(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { ready() - bat := b.db.NewBatch() // index checkpoint file async cmb := component.NewComponentManagerBuilder() for i := 0; i < pebbleBootstrapWorkerCount; i++ { @@ -86,8 +105,11 @@ func (b *Bootstrap) IndexCheckpointFile(ctx irrecoverable.SignalerContext, ready // error in reading a leaf node ctx.Throw(fmt.Errorf("error reading leaf node: %w", err)) } + // wait for the indexing to finish before populating heights <-c.Done() + bat := b.db.NewBatch() + defer bat.Close() // update heights atomically to prevent one getting populated without the other // leaving it in a corrupted state err = bat.Set(firstHeightKey(), encodedUint64(b.rootHeight), nil) @@ -103,27 +125,3 @@ func (b *Bootstrap) IndexCheckpointFile(ctx irrecoverable.SignalerContext, ready ctx.Throw(fmt.Errorf("unable to index first and latest heights: %w", err)) } } - -// indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir -// with wal.OpenAndReadLeafNodesFromCheckpointV6 -func (b *Bootstrap) indexCheckpointFileWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { - ready() - select { - case <-ctx.Done(): - return - default: - } - // collect leaf nodes to batch - batch := make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) - for leafNode := range b.leafNodeChan { - batch = append(batch, leafNode) - if len(batch) >= pebbleBootstrapRegisterBatchLen { - // index to pebble - err := b.batchIndexRegisters(batch) - if err != nil { - ctx.Throw(fmt.Errorf("unable to index registers to pebble: %w", err)) - } - return - } - } -} diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index df5882eae1e..737daee1446 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -1,10 +1,17 @@ package pebble import ( + "crypto/rand" + "io" "path" "testing" "github.com/cockroachdb/pebble" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/testutils" + "github.com/onflow/flow-go/ledger/complete/mtrie/trie" + "github.com/onflow/flow-go/ledger/complete/wal" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/storage/pebble/registers" @@ -12,23 +19,144 @@ import ( ) func TestBootstrap_NewBootstrap(t *testing.T) { + t.Parallel() sampleDir := path.Join(unittest.TempDir(t), "checkpoint.checkpoint") rootHeight := uint64(1) + log := zerolog.New(io.Discard) cache := pebble.NewCache(1 << 20) opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) unittest.RunWithConfiguredPebbleInstance(t, opts, func(p *pebble.DB) { // no issues when pebble instance is blank - _, err := NewBootstrap(p, sampleDir, rootHeight) + _, err := NewBootstrap(p, sampleDir, rootHeight, log) require.NoError(t, err) // set heights require.NoError(t, p.Set(firstHeightKey(), encodedUint64(rootHeight), nil)) require.NoError(t, p.Set(latestHeightKey(), encodedUint64(rootHeight), nil)) // errors if FirstHeight or LastHeight are populated - _, err = NewBootstrap(p, sampleDir, rootHeight) + _, err = NewBootstrap(p, sampleDir, rootHeight, log) require.ErrorContains(t, err, "cannot bootstrap populated DB") }) } -func TestBootstrap_IndexCheckpointFile(t *testing.T) { +func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { + t.Parallel() + log := zerolog.New(io.Discard) + rootHeight := uint64(10000) + // write empty trie + unittest.RunWithTempDir(t, func(dir string) { + t.Parallel() + fileName := "empty-checkpoint" + emptyTrie := []*trie.MTrie{trie.NewEmptyMTrie()} + require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(emptyTrie, dir, fileName, log), "fail to store checkpoint") + checkpointFile := path.Join(dir, fileName) + unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { + bootstrap, err := NewBootstrap(p, checkpointFile, rootHeight, log) + require.NoError(t, err) + }) + }) + + unittest.RunWithTempDir(t, func(dir string) { + t.Parallel() + tries := createSimpleTrie(t) + fileName := "simple-checkpoint" + require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") + unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { + + }) + }) + + unittest.RunWithTempDir(t, func(dir string) { + t.Parallel() + tries := createMultipleRandomTriesMini(t) + fileName := "random-checkpoint" + require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") + unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { + + }) + }) +} + +func TestBootstrap_IndexCheckpointFile_Error(t *testing.T) { + t.Parallel() + unittest.RunWithTempDir(t, func(dir string) { + log := zerolog.New(io.Discard) + // write trie and remove part of the file + + unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { + + }) + }) +} + +func getTestingPebbleOpts() *pebble.Options { + cache := pebble.NewCache(1 << 20) + return DefaultPebbleOptions(cache, registers.NewMVCCComparer()) +} + +// Todo: Move these functions to somewhere common, this is from checkpoint_v6_test.go +func createSimpleTrie(t *testing.T) []*trie.MTrie { + emptyTrie := trie.NewEmptyMTrie() + + p1 := testutils.PathByUint8(0) + v1 := testutils.LightPayload8('A', 'a') + + p2 := testutils.PathByUint8(1) + v2 := testutils.LightPayload8('B', 'b') + + paths := []ledger.Path{p1, p2} + payloads := []ledger.Payload{*v1, *v2} + + updatedTrie, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true) + require.NoError(t, err) + tries := []*trie.MTrie{emptyTrie, updatedTrie} + return tries +} + +func randPathPayload() (ledger.Path, ledger.Payload) { + var path ledger.Path + _, err := rand.Read(path[:]) + if err != nil { + panic("randomness failed") + } + payload := testutils.RandomPayload(1, 100) + return path, *payload +} + +func randNPathPayloads(n int) ([]ledger.Path, []ledger.Payload) { + paths := make([]ledger.Path, n) + payloads := make([]ledger.Payload, n) + for i := 0; i < n; i++ { + path, payload := randPathPayload() + paths[i] = path + payloads[i] = payload + } + return paths, payloads +} + +func createMultipleRandomTriesMini(t *testing.T) []*trie.MTrie { + tries := make([]*trie.MTrie, 0) + activeTrie := trie.NewEmptyMTrie() + + var err error + // add tries with no shared paths + for i := 0; i < 5; i++ { + paths, payloads := randNPathPayloads(20) + activeTrie, _, err = trie.NewTrieWithUpdatedRegisters(activeTrie, paths, payloads, false) + require.NoError(t, err, "update registers") + tries = append(tries, activeTrie) + } + + // add trie with some shared path + sharedPaths, payloads1 := randNPathPayloads(10) + activeTrie, _, err = trie.NewTrieWithUpdatedRegisters(activeTrie, sharedPaths, payloads1, false) + require.NoError(t, err, "update registers") + tries = append(tries, activeTrie) + + _, payloads2 := randNPathPayloads(10) + activeTrie, _, err = trie.NewTrieWithUpdatedRegisters(activeTrie, sharedPaths, payloads2, false) + require.NoError(t, err, "update registers") + tries = append(tries, activeTrie) + + return tries } From 149c17c50bc3634caceb36c99638bcc82bce53f8 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 24 Sep 2023 22:22:30 -0400 Subject: [PATCH 09/31] fix tests --- storage/pebble/bootstrap_test.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 737daee1446..c39adf70c22 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/ledger/common/testutils" "github.com/onflow/flow-go/ledger/complete/mtrie/trie" "github.com/onflow/flow-go/ledger/complete/wal" + "github.com/onflow/flow-go/module/component" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -52,7 +53,8 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { bootstrap, err := NewBootstrap(p, checkpointFile, rootHeight, log) require.NoError(t, err) - + cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() + <-cm.Done() }) }) @@ -61,8 +63,12 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { tries := createSimpleTrie(t) fileName := "simple-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") + checkpointFile := path.Join(dir, fileName) unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { - + bootstrap, err := NewBootstrap(p, checkpointFile, rootHeight, log) + require.NoError(t, err) + cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() + <-cm.Done() }) }) @@ -70,9 +76,13 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { t.Parallel() tries := createMultipleRandomTriesMini(t) fileName := "random-checkpoint" + checkpointFile := path.Join(dir, fileName) require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { - + bootstrap, err := NewBootstrap(p, checkpointFile, rootHeight, log) + require.NoError(t, err) + cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() + <-cm.Done() }) }) } From bc71659859a5c2f9b12ceed6189e51f64b621e41 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 24 Sep 2023 22:23:58 -0400 Subject: [PATCH 10/31] create error case --- storage/pebble/bootstrap_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index c39adf70c22..8f7fe1a1966 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -92,6 +92,10 @@ func TestBootstrap_IndexCheckpointFile_Error(t *testing.T) { unittest.RunWithTempDir(t, func(dir string) { log := zerolog.New(io.Discard) // write trie and remove part of the file + fileName := "simple-checkpoint" + tries := createSimpleTrie(t) + require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") + checkpointFile := path.Join(dir, fileName) unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { From ab271ecfb920d455e1ae8d55963391ad0b7a7f98 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 24 Sep 2023 23:09:55 -0400 Subject: [PATCH 11/31] adjust pebble tests for context --- storage/pebble/bootstrap.go | 1 - storage/pebble/bootstrap_test.go | 57 +++++++++++++++++--------------- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 31229955935..6a4e989b79a 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -105,7 +105,6 @@ func (b *Bootstrap) IndexCheckpointFile(ctx irrecoverable.SignalerContext, ready // error in reading a leaf node ctx.Throw(fmt.Errorf("error reading leaf node: %w", err)) } - // wait for the indexing to finish before populating heights <-c.Done() bat := b.db.NewBatch() diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 8f7fe1a1966..1e0dcc78ac4 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -1,6 +1,7 @@ package pebble import ( + "context" "crypto/rand" "io" "path" @@ -12,6 +13,7 @@ import ( "github.com/onflow/flow-go/ledger/complete/mtrie/trie" "github.com/onflow/flow-go/ledger/complete/wal" "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -45,7 +47,6 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { rootHeight := uint64(10000) // write empty trie unittest.RunWithTempDir(t, func(dir string) { - t.Parallel() fileName := "empty-checkpoint" emptyTrie := []*trie.MTrie{trie.NewEmptyMTrie()} require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(emptyTrie, dir, fileName, log), "fail to store checkpoint") @@ -53,13 +54,14 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { bootstrap, err := NewBootstrap(p, checkpointFile, rootHeight, log) require.NoError(t, err) - cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() - <-cm.Done() + ctx, cancel := context.WithCancel(context.Background()) + irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) + component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build().Start(irrecoverableCtx) + cancel() }) }) unittest.RunWithTempDir(t, func(dir string) { - t.Parallel() tries := createSimpleTrie(t) fileName := "simple-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") @@ -67,13 +69,14 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { bootstrap, err := NewBootstrap(p, checkpointFile, rootHeight, log) require.NoError(t, err) - cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() - <-cm.Done() + ctx, cancel := context.WithCancel(context.Background()) + irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) + component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build().Start(irrecoverableCtx) + cancel() }) }) unittest.RunWithTempDir(t, func(dir string) { - t.Parallel() tries := createMultipleRandomTriesMini(t) fileName := "random-checkpoint" checkpointFile := path.Join(dir, fileName) @@ -81,26 +84,28 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { bootstrap, err := NewBootstrap(p, checkpointFile, rootHeight, log) require.NoError(t, err) - cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() - <-cm.Done() + ctx, cancel := context.WithCancel(context.Background()) + irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) + component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build().Start(irrecoverableCtx) + cancel() }) }) } func TestBootstrap_IndexCheckpointFile_Error(t *testing.T) { t.Parallel() - unittest.RunWithTempDir(t, func(dir string) { - log := zerolog.New(io.Discard) - // write trie and remove part of the file - fileName := "simple-checkpoint" - tries := createSimpleTrie(t) - require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") - checkpointFile := path.Join(dir, fileName) - - unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { - - }) - }) + //unittest.RunWithTempDir(t, func(dir string) { + // log := zerolog.New(io.Discard) + // // write trie and remove part of the file + // fileName := "simple-checkpoint" + // tries := createSimpleTrie(t) + // require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") + // checkpointFile := path.Join(dir, fileName) + // + // unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { + // + // }) + //}) } func getTestingPebbleOpts() *pebble.Options { @@ -128,21 +133,21 @@ func createSimpleTrie(t *testing.T) []*trie.MTrie { } func randPathPayload() (ledger.Path, ledger.Payload) { - var path ledger.Path - _, err := rand.Read(path[:]) + var p ledger.Path + _, err := rand.Read(p[:]) if err != nil { panic("randomness failed") } payload := testutils.RandomPayload(1, 100) - return path, *payload + return p, *payload } func randNPathPayloads(n int) ([]ledger.Path, []ledger.Payload) { paths := make([]ledger.Path, n) payloads := make([]ledger.Payload, n) for i := 0; i < n; i++ { - path, payload := randPathPayload() - paths[i] = path + p, payload := randPathPayload() + paths[i] = p payloads[i] = payload } return paths, payloads From bfec254f639c0854a72d891302126f8f1f15c23f Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 24 Sep 2023 23:49:43 -0400 Subject: [PATCH 12/31] add ctx.Done() check --- storage/pebble/bootstrap_test.go | 70 ++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 1e0dcc78ac4..6289d2e29e6 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "io" + "os" "path" "testing" @@ -27,6 +28,7 @@ func TestBootstrap_NewBootstrap(t *testing.T) { rootHeight := uint64(1) log := zerolog.New(io.Discard) cache := pebble.NewCache(1 << 20) + defer cache.Unref() opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) unittest.RunWithConfiguredPebbleInstance(t, opts, func(p *pebble.DB) { // no issues when pebble instance is blank @@ -42,23 +44,27 @@ func TestBootstrap_NewBootstrap(t *testing.T) { } func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { - t.Parallel() log := zerolog.New(io.Discard) rootHeight := uint64(10000) - // write empty trie unittest.RunWithTempDir(t, func(dir string) { fileName := "empty-checkpoint" emptyTrie := []*trie.MTrie{trie.NewEmptyMTrie()} require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(emptyTrie, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { - bootstrap, err := NewBootstrap(p, checkpointFile, rootHeight, log) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) - component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build().Start(irrecoverableCtx) - cancel() - }) + cache := pebble.NewCache(1 << 20) + opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) + defer cache.Unref() + pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) + bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) + cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() + cm.Start(irrecoverableCtx) + <-cm.Done() + defer cancel() + require.NoError(t, pb.Close()) + require.NoError(t, os.RemoveAll(dbDir)) }) unittest.RunWithTempDir(t, func(dir string) { @@ -66,14 +72,20 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { fileName := "simple-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { - bootstrap, err := NewBootstrap(p, checkpointFile, rootHeight, log) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) - component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build().Start(irrecoverableCtx) - cancel() - }) + cache := pebble.NewCache(1 << 20) + opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) + defer cache.Unref() + pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) + bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) + cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() + cm.Start(irrecoverableCtx) + <-cm.Done() + defer cancel() + require.NoError(t, pb.Close()) + require.NoError(t, os.RemoveAll(dbDir)) }) unittest.RunWithTempDir(t, func(dir string) { @@ -81,14 +93,20 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { fileName := "random-checkpoint" checkpointFile := path.Join(dir, fileName) require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") - unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { - bootstrap, err := NewBootstrap(p, checkpointFile, rootHeight, log) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) - component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build().Start(irrecoverableCtx) - cancel() - }) + cache := pebble.NewCache(1 << 20) + opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) + defer cache.Unref() + pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) + bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) + cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() + cm.Start(irrecoverableCtx) + <-cm.Done() + defer cancel() + require.NoError(t, pb.Close()) + require.NoError(t, os.RemoveAll(dbDir)) }) } From c8c1811d560f009bdd9143c72f22bc1089d73869 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Tue, 26 Sep 2023 10:25:41 -0400 Subject: [PATCH 13/31] fix checkpoint opening and worker race condition --- storage/pebble/bootstrap.go | 17 +++++++------ storage/pebble/bootstrap_test.go | 43 ++++++++++++++++++++++++-------- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 6a4e989b79a..29b251bc628 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -85,26 +85,26 @@ func (b *Bootstrap) indexCheckpointFileWorker(ctx irrecoverable.SignalerContext, if err != nil { ctx.Throw(fmt.Errorf("unable to index registers to pebble: %w", err)) } + batch = make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) } } } // IndexCheckpointFile indexes the checkpoint file in the Dir provided as a component -func (b *Bootstrap) IndexCheckpointFile(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { - ready() +func (b *Bootstrap) IndexCheckpointFile(parentCtx irrecoverable.SignalerContext) error { // index checkpoint file async cmb := component.NewComponentManagerBuilder() for i := 0; i < pebbleBootstrapWorkerCount; i++ { // create workers to read and index registers cmb.AddWorker(b.indexCheckpointFileWorker) } - c := cmb.Build() - c.Start(ctx) err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log) if err != nil { // error in reading a leaf node - ctx.Throw(fmt.Errorf("error reading leaf node: %w", err)) + return fmt.Errorf("error reading leaf node: %w", err) } + c := cmb.Build() + c.Start(parentCtx) // wait for the indexing to finish before populating heights <-c.Done() bat := b.db.NewBatch() @@ -113,14 +113,15 @@ func (b *Bootstrap) IndexCheckpointFile(ctx irrecoverable.SignalerContext, ready // leaving it in a corrupted state err = bat.Set(firstHeightKey(), encodedUint64(b.rootHeight), nil) if err != nil { - ctx.Throw(fmt.Errorf("unable to add first height to batch: %w", err)) + return fmt.Errorf("unable to add first height to batch: %w", err) } err = bat.Set(latestHeightKey(), encodedUint64(b.rootHeight), nil) if err != nil { - ctx.Throw(fmt.Errorf("unable to add latest height to batch: %w", err)) + return fmt.Errorf("unable to add latest height to batch: %w", err) } err = bat.Commit(pebble.Sync) if err != nil { - ctx.Throw(fmt.Errorf("unable to index first and latest heights: %w", err)) + return fmt.Errorf("unable to index first and latest heights: %w", err) } + return nil } diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 6289d2e29e6..98efd248e60 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -13,7 +13,6 @@ import ( "github.com/onflow/flow-go/ledger/common/testutils" "github.com/onflow/flow-go/ledger/complete/mtrie/trie" "github.com/onflow/flow-go/ledger/complete/wal" - "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -44,6 +43,7 @@ func TestBootstrap_NewBootstrap(t *testing.T) { } func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { + t.Parallel() log := zerolog.New(io.Discard) rootHeight := uint64(10000) unittest.RunWithTempDir(t, func(dir string) { @@ -51,18 +51,26 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { emptyTrie := []*trie.MTrie{trie.NewEmptyMTrie()} require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(emptyTrie, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) + cache := pebble.NewCache(1 << 20) opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) defer cache.Unref() pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) + bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) - cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() - cm.Start(irrecoverableCtx) - <-cm.Done() + err = bootstrap.IndexCheckpointFile(irrecoverableCtx) + require.NoError(t, err) defer cancel() + // create registers instance and check values + reg, err := NewRegisters(pb) + + require.Equal(t, reg.LatestHeight(), rootHeight) + require.Equal(t, reg.FirstHeight(), rootHeight) + + require.NoError(t, err) require.NoError(t, pb.Close()) require.NoError(t, os.RemoveAll(dbDir)) }) @@ -80,10 +88,17 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) - cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() - cm.Start(irrecoverableCtx) - <-cm.Done() + err = bootstrap.IndexCheckpointFile(irrecoverableCtx) + require.NoError(t, err) defer cancel() + + // create registers instance and check values + reg, err := NewRegisters(pb) + + require.Equal(t, reg.LatestHeight(), rootHeight) + require.Equal(t, reg.FirstHeight(), rootHeight) + + require.NoError(t, err) require.NoError(t, pb.Close()) require.NoError(t, os.RemoveAll(dbDir)) }) @@ -93,18 +108,26 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { fileName := "random-checkpoint" checkpointFile := path.Join(dir, fileName) require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") + cache := pebble.NewCache(1 << 20) opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) defer cache.Unref() pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) - cm := component.NewComponentManagerBuilder().AddWorker(bootstrap.IndexCheckpointFile).Build() - cm.Start(irrecoverableCtx) - <-cm.Done() + err = bootstrap.IndexCheckpointFile(irrecoverableCtx) + require.NoError(t, err) defer cancel() + // create registers instance and check values + reg, err := NewRegisters(pb) + + require.Equal(t, reg.LatestHeight(), rootHeight) + require.Equal(t, reg.FirstHeight(), rootHeight) + + require.NoError(t, err) require.NoError(t, pb.Close()) require.NoError(t, os.RemoveAll(dbDir)) }) From 93a52dd68eb16afdadbd6948c781e97e9a2cf33d Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Tue, 26 Sep 2023 23:24:37 -0400 Subject: [PATCH 14/31] make skeletons for remaining tests --- storage/pebble/bootstrap.go | 20 +++- storage/pebble/bootstrap_test.go | 175 ++++++++++--------------------- storage/pebble/lookup.go | 4 +- 3 files changed, 77 insertions(+), 122 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 29b251bc628..1a70e7f9d8e 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -1,6 +1,7 @@ package pebble import ( + "context" "fmt" "path/filepath" @@ -83,15 +84,21 @@ func (b *Bootstrap) indexCheckpointFileWorker(ctx irrecoverable.SignalerContext, if len(batch) >= pebbleBootstrapRegisterBatchLen { err := b.batchIndexRegisters(batch) if err != nil { - ctx.Throw(fmt.Errorf("unable to index registers to pebble: %w", err)) + ctx.Throw(fmt.Errorf("unable to index registers to pebble in batch: %w", err)) } batch = make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) } } + err := b.batchIndexRegisters(batch) + if err != nil { + ctx.Throw(fmt.Errorf("unable to index remaining registers to pebble: %w", err)) + } } // IndexCheckpointFile indexes the checkpoint file in the Dir provided as a component -func (b *Bootstrap) IndexCheckpointFile(parentCtx irrecoverable.SignalerContext) error { +func (b *Bootstrap) IndexCheckpointFile() error { + ctx := context.Background() + sigCtx, errChan := irrecoverable.WithSignaler(ctx) // index checkpoint file async cmb := component.NewComponentManagerBuilder() for i := 0; i < pebbleBootstrapWorkerCount; i++ { @@ -104,9 +111,16 @@ func (b *Bootstrap) IndexCheckpointFile(parentCtx irrecoverable.SignalerContext) return fmt.Errorf("error reading leaf node: %w", err) } c := cmb.Build() - c.Start(parentCtx) + c.Start(sigCtx) // wait for the indexing to finish before populating heights <-c.Done() + + select { + case procErr := <-errChan: + return fmt.Errorf("failed to index checkpoint file: %w", procErr) + default: + } + bat := b.db.NewBatch() defer bat.Close() // update heights atomically to prevent one getting populated without the other diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 98efd248e60..1a90a5ab6ee 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -1,19 +1,18 @@ package pebble import ( - "context" - "crypto/rand" "io" "os" "path" "testing" "github.com/cockroachdb/pebble" + "github.com/onflow/flow-go/engine/execution/state" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/testutils" "github.com/onflow/flow-go/ledger/complete/mtrie/trie" "github.com/onflow/flow-go/ledger/complete/wal" - "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/model/flow" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -33,51 +32,58 @@ func TestBootstrap_NewBootstrap(t *testing.T) { // no issues when pebble instance is blank _, err := NewBootstrap(p, sampleDir, rootHeight, log) require.NoError(t, err) + }) + unittest.RunWithConfiguredPebbleInstance(t, opts, func(p *pebble.DB) { // set heights require.NoError(t, p.Set(firstHeightKey(), encodedUint64(rootHeight), nil)) require.NoError(t, p.Set(latestHeightKey(), encodedUint64(rootHeight), nil)) // errors if FirstHeight or LastHeight are populated - _, err = NewBootstrap(p, sampleDir, rootHeight, log) + _, err := NewBootstrap(p, sampleDir, rootHeight, log) require.ErrorContains(t, err, "cannot bootstrap populated DB") }) } -func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { +func TestBootstrap_IndexCheckpointFile_Happy(t *testing.T) { t.Parallel() log := zerolog.New(io.Discard) rootHeight := uint64(10000) unittest.RunWithTempDir(t, func(dir string) { - fileName := "empty-checkpoint" - emptyTrie := []*trie.MTrie{trie.NewEmptyMTrie()} - require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(emptyTrie, dir, fileName, log), "fail to store checkpoint") + tries, registerIDs := simpleTrieWithValidRegisterIDs(t) + fileName := "simple-checkpoint" + require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - cache := pebble.NewCache(1 << 20) opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) defer cache.Unref() pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) - bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) - err = bootstrap.IndexCheckpointFile(irrecoverableCtx) + err = bootstrap.IndexCheckpointFile() require.NoError(t, err) - defer cancel() + // create registers instance and check values reg, err := NewRegisters(pb) + require.NoError(t, err) require.Equal(t, reg.LatestHeight(), rootHeight) require.Equal(t, reg.FirstHeight(), rootHeight) - require.NoError(t, err) + for _, register := range registerIDs { + err, value := reg.Get(*register, rootHeight) + } + require.NoError(t, pb.Close()) require.NoError(t, os.RemoveAll(dbDir)) }) +} +func TestBootstrap_IndexCheckpointFile_Empty(t *testing.T) { + t.Parallel() + log := zerolog.New(io.Discard) + rootHeight := uint64(10000) unittest.RunWithTempDir(t, func(dir string) { - tries := createSimpleTrie(t) - fileName := "simple-checkpoint" + tries := []*trie.MTrie{trie.NewEmptyMTrie()} + fileName := "empty-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) cache := pebble.NewCache(1 << 20) @@ -86,137 +92,72 @@ func TestBootstrap_IndexCheckpointFile_Random(t *testing.T) { pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) - err = bootstrap.IndexCheckpointFile(irrecoverableCtx) + err = bootstrap.IndexCheckpointFile() require.NoError(t, err) - defer cancel() // create registers instance and check values reg, err := NewRegisters(pb) + require.NoError(t, err) require.Equal(t, reg.LatestHeight(), rootHeight) require.Equal(t, reg.FirstHeight(), rootHeight) - require.NoError(t, err) require.NoError(t, pb.Close()) require.NoError(t, os.RemoveAll(dbDir)) }) +} - unittest.RunWithTempDir(t, func(dir string) { - tries := createMultipleRandomTriesMini(t) - fileName := "random-checkpoint" - checkpointFile := path.Join(dir, fileName) - require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") - - cache := pebble.NewCache(1 << 20) - opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) - defer cache.Unref() - pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) - bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx) - err = bootstrap.IndexCheckpointFile(irrecoverableCtx) - require.NoError(t, err) - defer cancel() - // create registers instance and check values - reg, err := NewRegisters(pb) - - require.Equal(t, reg.LatestHeight(), rootHeight) - require.Equal(t, reg.FirstHeight(), rootHeight) +func TestBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { + t.Parallel() - require.NoError(t, err) - require.NoError(t, pb.Close()) - require.NoError(t, os.RemoveAll(dbDir)) - }) } func TestBootstrap_IndexCheckpointFile_Error(t *testing.T) { t.Parallel() - //unittest.RunWithTempDir(t, func(dir string) { - // log := zerolog.New(io.Discard) - // // write trie and remove part of the file - // fileName := "simple-checkpoint" - // tries := createSimpleTrie(t) - // require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") - // checkpointFile := path.Join(dir, fileName) - // - // unittest.RunWithConfiguredPebbleInstance(t, getTestingPebbleOpts(), func(p *pebble.DB) { - // - // }) - //}) -} -func getTestingPebbleOpts() *pebble.Options { - cache := pebble.NewCache(1 << 20) - return DefaultPebbleOptions(cache, registers.NewMVCCComparer()) } -// Todo: Move these functions to somewhere common, this is from checkpoint_v6_test.go -func createSimpleTrie(t *testing.T) []*trie.MTrie { - emptyTrie := trie.NewEmptyMTrie() - +func simpleTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.RegisterID) { p1 := testutils.PathByUint8(0) - v1 := testutils.LightPayload8('A', 'a') - p2 := testutils.PathByUint8(1) - v2 := testutils.LightPayload8('B', 'b') - paths := []ledger.Path{p1, p2} - payloads := []ledger.Payload{*v1, *v2} - + payloads := RandomRegisterPayloads(2) + // collect register IDs to return + rID := make([]*flow.RegisterID, 0, 2) + for _, payload := range payloads { + key, err := payload.Key() + require.NoError(t, err) + regID, err := keyToRegisterID(key) + require.NoError(t, err) + rID = append(rID, regID) + } + emptyTrie := trie.NewEmptyMTrie() updatedTrie, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true) require.NoError(t, err) tries := []*trie.MTrie{emptyTrie, updatedTrie} - return tries + return tries, rID } -func randPathPayload() (ledger.Path, ledger.Payload) { - var p ledger.Path - _, err := rand.Read(p[:]) - if err != nil { - panic("randomness failed") +// RegisterPayloadFixture a single payload with a key structure of an execution state register value +func RegisterPayloadFixture() *ledger.Payload { + k := ledger.Key{KeyParts: []ledger.KeyPart{ + {Type: state.KeyPartOwner, Value: []byte{byte('o')}}, + {Type: state.KeyPartKey, Value: []byte{byte('k')}}}, } - payload := testutils.RandomPayload(1, 100) - return p, *payload + v := ledger.Value{uint8(0)} + return ledger.NewPayload(k, v) } -func randNPathPayloads(n int) ([]ledger.Path, []ledger.Payload) { - paths := make([]ledger.Path, n) - payloads := make([]ledger.Payload, n) +func RandomRegisterPayloads(n int) []*ledger.Payload { + p := make([]*ledger.Payload, 0, n) for i := 0; i < n; i++ { - p, payload := randPathPayload() - paths[i] = p - payloads[i] = payload - } - return paths, payloads -} - -func createMultipleRandomTriesMini(t *testing.T) []*trie.MTrie { - tries := make([]*trie.MTrie, 0) - activeTrie := trie.NewEmptyMTrie() - - var err error - // add tries with no shared paths - for i := 0; i < 5; i++ { - paths, payloads := randNPathPayloads(20) - activeTrie, _, err = trie.NewTrieWithUpdatedRegisters(activeTrie, paths, payloads, false) - require.NoError(t, err, "update registers") - tries = append(tries, activeTrie) + k := ledger.Key{KeyParts: []ledger.KeyPart{ + {Type: state.KeyPartOwner, Value: []byte{byte('o')}}, + {Type: state.KeyPartKey, Value: []byte{byte('k')}}}, + } + v := ledger.Value{uint8(0)} + pl := ledger.NewPayload(k, v) + p = append(p, pl) } - - // add trie with some shared path - sharedPaths, payloads1 := randNPathPayloads(10) - activeTrie, _, err = trie.NewTrieWithUpdatedRegisters(activeTrie, sharedPaths, payloads1, false) - require.NoError(t, err, "update registers") - tries = append(tries, activeTrie) - - _, payloads2 := randNPathPayloads(10) - activeTrie, _, err = trie.NewTrieWithUpdatedRegisters(activeTrie, sharedPaths, payloads2, false) - require.NoError(t, err, "update registers") - tries = append(tries, activeTrie) - - return tries + return p } diff --git a/storage/pebble/lookup.go b/storage/pebble/lookup.go index 1b192247015..01c0285f642 100644 --- a/storage/pebble/lookup.go +++ b/storage/pebble/lookup.go @@ -125,13 +125,13 @@ func (h lookupKey) Bytes() []byte { return h.encoded } -// EncodedUint64 encodes uint64 for storing as a pebble payload +// encodedUint64 encodes uint64 for storing as a pebble payload func encodedUint64(height uint64) []byte { payload := make([]byte, 0, 8) return binary.BigEndian.AppendUint64(payload, height) } -// KeyToRegisterID converts a ledger key into a register ID. +// keyToRegisterID converts a ledger key into a register ID. func keyToRegisterID(key ledger.Key) (flow.RegisterID, error) { if len(key.KeyParts) != 2 || key.KeyParts[0].Type != state.KeyPartOwner || From 7197fbb2c7508d81e341e5ac6424cbfb0988cfe2 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Wed, 27 Sep 2023 11:39:47 -0400 Subject: [PATCH 15/31] fix helper functions --- storage/pebble/bootstrap_test.go | 60 +++++++++++++------------------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 1a90a5ab6ee..19ce876a61a 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -1,6 +1,7 @@ package pebble import ( + "encoding/binary" "io" "os" "path" @@ -13,34 +14,27 @@ import ( "github.com/onflow/flow-go/ledger/complete/mtrie/trie" "github.com/onflow/flow-go/ledger/complete/wal" "github.com/onflow/flow-go/model/flow" - "github.com/rs/zerolog" - "github.com/stretchr/testify/require" - "github.com/onflow/flow-go/storage/pebble/registers" "github.com/onflow/flow-go/utils/unittest" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" ) func TestBootstrap_NewBootstrap(t *testing.T) { - t.Parallel() sampleDir := path.Join(unittest.TempDir(t), "checkpoint.checkpoint") rootHeight := uint64(1) log := zerolog.New(io.Discard) cache := pebble.NewCache(1 << 20) defer cache.Unref() opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) - unittest.RunWithConfiguredPebbleInstance(t, opts, func(p *pebble.DB) { - // no issues when pebble instance is blank - _, err := NewBootstrap(p, sampleDir, rootHeight, log) - require.NoError(t, err) - }) - unittest.RunWithConfiguredPebbleInstance(t, opts, func(p *pebble.DB) { - // set heights - require.NoError(t, p.Set(firstHeightKey(), encodedUint64(rootHeight), nil)) - require.NoError(t, p.Set(latestHeightKey(), encodedUint64(rootHeight), nil)) - // errors if FirstHeight or LastHeight are populated - _, err := NewBootstrap(p, sampleDir, rootHeight, log) - require.ErrorContains(t, err, "cannot bootstrap populated DB") - }) + p, dir := unittest.TempPebbleDBWithOpts(t, opts) + // set heights + require.NoError(t, p.Set(firstHeightKey(), encodedUint64(rootHeight), nil)) + require.NoError(t, p.Set(latestHeightKey(), encodedUint64(rootHeight), nil)) + // errors if FirstHeight or LastHeight are populated + _, err := NewBootstrap(p, sampleDir, rootHeight, log) + require.ErrorContains(t, err, "cannot bootstrap populated DB") + require.NoError(t, os.RemoveAll(dir)) } func TestBootstrap_IndexCheckpointFile_Happy(t *testing.T) { @@ -69,7 +63,9 @@ func TestBootstrap_IndexCheckpointFile_Happy(t *testing.T) { require.Equal(t, reg.FirstHeight(), rootHeight) for _, register := range registerIDs { - err, value := reg.Get(*register, rootHeight) + val, err := reg.Get(*register, rootHeight) + require.NoError(t, err) + require.Equal(t, val, []byte{uint8(0)}) } require.NoError(t, pb.Close()) @@ -129,7 +125,7 @@ func simpleTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.Regist require.NoError(t, err) regID, err := keyToRegisterID(key) require.NoError(t, err) - rID = append(rID, regID) + rID = append(rID, ®ID) } emptyTrie := trie.NewEmptyMTrie() updatedTrie, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true) @@ -138,26 +134,18 @@ func simpleTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.Regist return tries, rID } -// RegisterPayloadFixture a single payload with a key structure of an execution state register value -func RegisterPayloadFixture() *ledger.Payload { - k := ledger.Key{KeyParts: []ledger.KeyPart{ - {Type: state.KeyPartOwner, Value: []byte{byte('o')}}, - {Type: state.KeyPartKey, Value: []byte{byte('k')}}}, - } - v := ledger.Value{uint8(0)} - return ledger.NewPayload(k, v) -} - -func RandomRegisterPayloads(n int) []*ledger.Payload { - p := make([]*ledger.Payload, 0, n) - for i := 0; i < n; i++ { +func RandomRegisterPayloads(n uint64) []ledger.Payload { + p := make([]ledger.Payload, 0, n) + for i := uint64(0); i < n; i++ { + o := make([]byte, 0, 8) + o = binary.BigEndian.AppendUint64(o, n) k := ledger.Key{KeyParts: []ledger.KeyPart{ - {Type: state.KeyPartOwner, Value: []byte{byte('o')}}, - {Type: state.KeyPartKey, Value: []byte{byte('k')}}}, - } + {Type: state.KeyPartOwner, Value: o}, + {Type: state.KeyPartKey, Value: o}, + }} v := ledger.Value{uint8(0)} pl := ledger.NewPayload(k, v) - p = append(p, pl) + p = append(p, *pl) } return p } From 8411e4316131b15a2206524431b1f5b8a73bc311 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Wed, 27 Sep 2023 11:41:56 -0400 Subject: [PATCH 16/31] remainign test cases --- storage/pebble/bootstrap_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 19ce876a61a..9cd34c240cc 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -21,6 +21,7 @@ import ( ) func TestBootstrap_NewBootstrap(t *testing.T) { + t.Parallel() sampleDir := path.Join(unittest.TempDir(t), "checkpoint.checkpoint") rootHeight := uint64(1) log := zerolog.New(io.Discard) @@ -113,6 +114,11 @@ func TestBootstrap_IndexCheckpointFile_Error(t *testing.T) { } +func TestBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { + t.Parallel() + +} + func simpleTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.RegisterID) { p1 := testutils.PathByUint8(0) p2 := testutils.PathByUint8(1) From 70185be821b7fa519c86e8e0bf4f4d6e2a18b003 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Wed, 27 Sep 2023 12:19:58 -0400 Subject: [PATCH 17/31] complete failure test --- storage/pebble/bootstrap_test.go | 73 +++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 2 deletions(-) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 9cd34c240cc..0f820a73c2f 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -106,6 +106,35 @@ func TestBootstrap_IndexCheckpointFile_Empty(t *testing.T) { func TestBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { t.Parallel() + pa1 := testutils.PathByUint8(0) + pa2 := testutils.PathByUint8(1) + rootHeight := uint64(666) + pl1 := testutils.LightPayload8('A', 'A') + pl2 := testutils.LightPayload('B', 'B') + paths := []ledger.Path{pa1, pa2} + payloads := []ledger.Payload{*pl1, *pl2} + emptyTrie := trie.NewEmptyMTrie() + trieWithInvalidEntry, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true) + require.NoError(t, err) + log := zerolog.New(io.Discard) + + unittest.RunWithTempDir(t, func(dir string) { + fileName := "invalid-checkpoint" + require.NoErrorf(t, wal.StoreCheckpointV6Concurrently([]*trie.MTrie{trieWithInvalidEntry}, dir, fileName, log), + "fail to store checkpoint") + checkpointFile := path.Join(dir, fileName) + cache := pebble.NewCache(1 << 20) + opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) + defer cache.Unref() + + pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) + bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) + require.NoError(t, err) + err = bootstrap.IndexCheckpointFile() + require.ErrorContains(t, err, "key not in expected format") + require.NoError(t, pb.Close()) + require.NoError(t, os.RemoveAll(dbDir)) + }) } @@ -116,6 +145,38 @@ func TestBootstrap_IndexCheckpointFile_Error(t *testing.T) { func TestBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { t.Parallel() + log := zerolog.New(io.Discard) + rootHeight := uint64(10000) + unittest.RunWithTempDir(t, func(dir string) { + tries, registerIDs := simpleTrieWithValidRegisterIDs(t) + fileName := "simple-checkpoint" + require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") + checkpointFile := path.Join(dir, fileName) + cache := pebble.NewCache(1 << 20) + opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) + defer cache.Unref() + pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) + bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) + require.NoError(t, err) + err = bootstrap.IndexCheckpointFile() + require.NoError(t, err) + + // create registers instance and check values + reg, err := NewRegisters(pb) + require.NoError(t, err) + + require.Equal(t, reg.LatestHeight(), rootHeight) + require.Equal(t, reg.FirstHeight(), rootHeight) + + for _, register := range registerIDs { + val, err := reg.Get(*register, rootHeight) + require.NoError(t, err) + require.Equal(t, val, []byte{uint8(0)}) + } + + require.NoError(t, pb.Close()) + require.NoError(t, os.RemoveAll(dbDir)) + }) } @@ -123,7 +184,7 @@ func simpleTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.Regist p1 := testutils.PathByUint8(0) p2 := testutils.PathByUint8(1) paths := []ledger.Path{p1, p2} - payloads := RandomRegisterPayloads(2) + payloads := randomRegisterPayloads(2) // collect register IDs to return rID := make([]*flow.RegisterID, 0, 2) for _, payload := range payloads { @@ -140,7 +201,11 @@ func simpleTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.Regist return tries, rID } -func RandomRegisterPayloads(n uint64) []ledger.Payload { +func largeTrieWithValidRegisterIDs(t *testing.T) { + +} + +func randomRegisterPayloads(n uint64) []ledger.Payload { p := make([]ledger.Payload, 0, n) for i := uint64(0); i < n; i++ { o := make([]byte, 0, 8) @@ -155,3 +220,7 @@ func RandomRegisterPayloads(n uint64) []ledger.Payload { } return p } + +func randomRegisterPaths(n uint64) []ledger.Path { + return nil +} From 1648f3fb864bc87e2069eb8d50ffa584a3efa9d1 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Wed, 27 Sep 2023 21:06:37 -0400 Subject: [PATCH 18/31] update batch tests and godocs --- storage/pebble/bootstrap.go | 74 ++++++++++++------------- storage/pebble/bootstrap_test.go | 92 ++++++++++++++++++-------------- 2 files changed, 87 insertions(+), 79 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 1a70e7f9d8e..494b274fc41 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -6,13 +6,13 @@ import ( "path/filepath" "github.com/cockroachdb/pebble" - "github.com/onflow/flow-go/ledger/complete/wal" - "github.com/onflow/flow-go/module/component" - "github.com/onflow/flow-go/module/irrecoverable" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" + + "github.com/onflow/flow-go/ledger/complete/wal" ) -type Bootstrap struct { +type RegisterBootstrap struct { checkpointDir string checkpointFileName string log zerolog.Logger @@ -21,7 +21,15 @@ type Bootstrap struct { rootHeight uint64 } -func NewBootstrap(db *pebble.DB, checkpointFile string, rootHeight uint64, log zerolog.Logger) (*Bootstrap, error) { +// NewRegisterBootstrap creates the bootstrap object for reading checkpoint data and the height tracker in pebble +// This object must be initialized and RegisterBootstrap.IndexCheckpointFile must be run to have the pebble db instance +// in the correct state to initialize a Registers store. +func NewRegisterBootstrap( + db *pebble.DB, + checkpointFile string, + rootHeight uint64, + log zerolog.Logger, +) (*RegisterBootstrap, error) { // check for pre-populated heights, fail if it is populated // i.e. the IndexCheckpointFile function has already run for the db in this directory checkpointDir, checkpointFileName := filepath.Split(checkpointFile) @@ -30,7 +38,7 @@ func NewBootstrap(db *pebble.DB, checkpointFile string, rootHeight uint64, log z // key detected, attempt to run bootstrap on corrupt or already bootstrapped data return nil, fmt.Errorf("found latest key set on badger instance, cannot bootstrap populated DB") } - return &Bootstrap{ + return &RegisterBootstrap{ checkpointDir: checkpointDir, checkpointFileName: checkpointFileName, log: log, @@ -40,7 +48,8 @@ func NewBootstrap(db *pebble.DB, checkpointFile string, rootHeight uint64, log z }, nil } -func (b *Bootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error { +// batchIndexRegisters +func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error { batch := b.db.NewBatch() defer batch.Close() for _, register := range leafNodes { @@ -70,13 +79,7 @@ func (b *Bootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error { // indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir // with wal.OpenAndReadLeafNodesFromCheckpointV6 -func (b *Bootstrap) indexCheckpointFileWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { - ready() - select { - case <-ctx.Done(): - return - default: - } +func (b *RegisterBootstrap) indexCheckpointFileWorker() error { // collect leaf nodes to batch index until the channel is closed batch := make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) for leafNode := range b.leafNodeChan { @@ -84,56 +87,47 @@ func (b *Bootstrap) indexCheckpointFileWorker(ctx irrecoverable.SignalerContext, if len(batch) >= pebbleBootstrapRegisterBatchLen { err := b.batchIndexRegisters(batch) if err != nil { - ctx.Throw(fmt.Errorf("unable to index registers to pebble in batch: %w", err)) + return fmt.Errorf("unable to index registers to pebble in batch: %w", err) } batch = make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) } } + // index the remaining registers if didn't reach a batch length. err := b.batchIndexRegisters(batch) if err != nil { - ctx.Throw(fmt.Errorf("unable to index remaining registers to pebble: %w", err)) + return fmt.Errorf("unable to index remaining registers to pebble: %w", err) } + return nil } -// IndexCheckpointFile indexes the checkpoint file in the Dir provided as a component -func (b *Bootstrap) IndexCheckpointFile() error { - ctx := context.Background() - sigCtx, errChan := irrecoverable.WithSignaler(ctx) - // index checkpoint file async - cmb := component.NewComponentManagerBuilder() +// IndexCheckpointFile indexes the checkpoint file in the Dir provided +func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error { + g, _ := errgroup.WithContext(ctx) for i := 0; i < pebbleBootstrapWorkerCount; i++ { - // create workers to read and index registers - cmb.AddWorker(b.indexCheckpointFileWorker) + g.Go(func() error { + return b.indexCheckpointFileWorker() + }) } err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log) if err != nil { // error in reading a leaf node return fmt.Errorf("error reading leaf node: %w", err) } - c := cmb.Build() - c.Start(sigCtx) - // wait for the indexing to finish before populating heights - <-c.Done() - - select { - case procErr := <-errChan: - return fmt.Errorf("failed to index checkpoint file: %w", procErr) - default: + if err = g.Wait(); err != nil { + return fmt.Errorf("failed to index checkpoint file: %w", err) } - - bat := b.db.NewBatch() - defer bat.Close() + batch := b.db.NewBatch() + defer batch.Close() // update heights atomically to prevent one getting populated without the other - // leaving it in a corrupted state - err = bat.Set(firstHeightKey(), encodedUint64(b.rootHeight), nil) + err = batch.Set(firstHeightKey(), encodedUint64(b.rootHeight), nil) if err != nil { return fmt.Errorf("unable to add first height to batch: %w", err) } - err = bat.Set(latestHeightKey(), encodedUint64(b.rootHeight), nil) + err = batch.Set(latestHeightKey(), encodedUint64(b.rootHeight), nil) if err != nil { return fmt.Errorf("unable to add latest height to batch: %w", err) } - err = bat.Commit(pebble.Sync) + err = batch.Commit(pebble.Sync) if err != nil { return fmt.Errorf("unable to index first and latest heights: %w", err) } diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 0f820a73c2f..f5fc58304f5 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -1,6 +1,7 @@ package pebble import ( + "context" "encoding/binary" "io" "os" @@ -8,6 +9,9 @@ import ( "testing" "github.com/cockroachdb/pebble" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/engine/execution/state" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/testutils" @@ -16,11 +20,9 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/storage/pebble/registers" "github.com/onflow/flow-go/utils/unittest" - "github.com/rs/zerolog" - "github.com/stretchr/testify/require" ) -func TestBootstrap_NewBootstrap(t *testing.T) { +func TestRegisterBootstrap_NewBootstrap(t *testing.T) { t.Parallel() sampleDir := path.Join(unittest.TempDir(t), "checkpoint.checkpoint") rootHeight := uint64(1) @@ -33,12 +35,12 @@ func TestBootstrap_NewBootstrap(t *testing.T) { require.NoError(t, p.Set(firstHeightKey(), encodedUint64(rootHeight), nil)) require.NoError(t, p.Set(latestHeightKey(), encodedUint64(rootHeight), nil)) // errors if FirstHeight or LastHeight are populated - _, err := NewBootstrap(p, sampleDir, rootHeight, log) + _, err := NewRegisterBootstrap(p, sampleDir, rootHeight, log) require.ErrorContains(t, err, "cannot bootstrap populated DB") require.NoError(t, os.RemoveAll(dir)) } -func TestBootstrap_IndexCheckpointFile_Happy(t *testing.T) { +func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) { t.Parallel() log := zerolog.New(io.Discard) rootHeight := uint64(10000) @@ -47,13 +49,15 @@ func TestBootstrap_IndexCheckpointFile_Happy(t *testing.T) { fileName := "simple-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) + cache := pebble.NewCache(1 << 20) opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) defer cache.Unref() pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) - bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) + + bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) - err = bootstrap.IndexCheckpointFile() + err = bootstrap.IndexCheckpointFile(context.Background()) require.NoError(t, err) // create registers instance and check values @@ -74,7 +78,7 @@ func TestBootstrap_IndexCheckpointFile_Happy(t *testing.T) { }) } -func TestBootstrap_IndexCheckpointFile_Empty(t *testing.T) { +func TestRegisterBootstrap_IndexCheckpointFile_Empty(t *testing.T) { t.Parallel() log := zerolog.New(io.Discard) rootHeight := uint64(10000) @@ -87,9 +91,9 @@ func TestBootstrap_IndexCheckpointFile_Empty(t *testing.T) { opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) defer cache.Unref() pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) - bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) + bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) - err = bootstrap.IndexCheckpointFile() + err = bootstrap.IndexCheckpointFile(context.Background()) require.NoError(t, err) // create registers instance and check values @@ -104,7 +108,7 @@ func TestBootstrap_IndexCheckpointFile_Empty(t *testing.T) { }) } -func TestBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { +func TestRegisterBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { t.Parallel() pa1 := testutils.PathByUint8(0) pa2 := testutils.PathByUint8(1) @@ -128,9 +132,9 @@ func TestBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { defer cache.Unref() pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) - bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) + bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) - err = bootstrap.IndexCheckpointFile() + err = bootstrap.IndexCheckpointFile(context.Background()) require.ErrorContains(t, err, "key not in expected format") require.NoError(t, pb.Close()) require.NoError(t, os.RemoveAll(dbDir)) @@ -138,27 +142,27 @@ func TestBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { } -func TestBootstrap_IndexCheckpointFile_Error(t *testing.T) { +func TestRegisterBootstrap_IndexCheckpointFile_CorruptedCheckpointFile(t *testing.T) { t.Parallel() } -func TestBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { +func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { t.Parallel() log := zerolog.New(io.Discard) rootHeight := uint64(10000) unittest.RunWithTempDir(t, func(dir string) { - tries, registerIDs := simpleTrieWithValidRegisterIDs(t) - fileName := "simple-checkpoint" + tries, registerIDs := largeTrieWithValidRegisterIDs(t) + fileName := "large-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) cache := pebble.NewCache(1 << 20) opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) defer cache.Unref() pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) - bootstrap, err := NewBootstrap(pb, checkpointFile, rootHeight, log) + bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) - err = bootstrap.IndexCheckpointFile() + err = bootstrap.IndexCheckpointFile(context.Background()) require.NoError(t, err) // create registers instance and check values @@ -181,39 +185,45 @@ func TestBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { } func simpleTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.RegisterID) { - p1 := testutils.PathByUint8(0) - p2 := testutils.PathByUint8(1) - paths := []ledger.Path{p1, p2} - payloads := randomRegisterPayloads(2) - // collect register IDs to return - rID := make([]*flow.RegisterID, 0, 2) + return trieWithValidRegisterIDs(t, 2) +} + +func largeTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.RegisterID) { + // large enough trie so every worker should have something to index + largeTrieSize := 2 * pebbleBootstrapRegisterBatchLen * pebbleBootstrapWorkerCount + return trieWithValidRegisterIDs(t, uint16(largeTrieSize)) +} + +func trieWithValidRegisterIDs(t *testing.T, n uint16) ([]*trie.MTrie, []*flow.RegisterID) { + emptyTrie := trie.NewEmptyMTrie() + resultRegisterIDs := make([]*flow.RegisterID, 0, n) + paths := randomRegisterPaths(n) + payloads := randomRegisterPayloads(n) for _, payload := range payloads { key, err := payload.Key() require.NoError(t, err) regID, err := keyToRegisterID(key) require.NoError(t, err) - rID = append(rID, ®ID) + resultRegisterIDs = append(resultRegisterIDs, ®ID) } - emptyTrie := trie.NewEmptyMTrie() - updatedTrie, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true) + populatedTrie, depth, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true) + // make sure it has at least 1 leaf node + require.GreaterOrEqual(t, depth, uint16(1)) require.NoError(t, err) - tries := []*trie.MTrie{emptyTrie, updatedTrie} - return tries, rID + resultTries := []*trie.MTrie{emptyTrie, populatedTrie} + return resultTries, resultRegisterIDs } -func largeTrieWithValidRegisterIDs(t *testing.T) { - -} - -func randomRegisterPayloads(n uint64) []ledger.Payload { +func randomRegisterPayloads(n uint16) []ledger.Payload { p := make([]ledger.Payload, 0, n) - for i := uint64(0); i < n; i++ { + for i := uint16(0); i < n; i++ { o := make([]byte, 0, 8) - o = binary.BigEndian.AppendUint64(o, n) + o = binary.BigEndian.AppendUint16(o, n) k := ledger.Key{KeyParts: []ledger.KeyPart{ {Type: state.KeyPartOwner, Value: o}, {Type: state.KeyPartKey, Value: o}, }} + // values are always 0 for ease of testing/checking v := ledger.Value{uint8(0)} pl := ledger.NewPayload(k, v) p = append(p, *pl) @@ -221,6 +231,10 @@ func randomRegisterPayloads(n uint64) []ledger.Payload { return p } -func randomRegisterPaths(n uint64) []ledger.Path { - return nil +func randomRegisterPaths(n uint16) []ledger.Path { + p := make([]ledger.Path, 0, n) + for i := uint16(0); i < n; i++ { + p = append(p, testutils.PathByUint16(i)) + } + return p } From 7584015dcec1b95f6f4bd2de62c126a84671c9aa Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Wed, 27 Sep 2023 21:27:07 -0400 Subject: [PATCH 19/31] add corrupted file check and debug logs --- storage/pebble/bootstrap.go | 5 ++++- storage/pebble/bootstrap_test.go | 21 ++++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 494b274fc41..0961b9d2848 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -50,6 +50,7 @@ func NewRegisterBootstrap( // batchIndexRegisters func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error { + b.log.Debug().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes ") batch := b.db.NewBatch() defer batch.Close() for _, register := range leafNodes { @@ -80,6 +81,7 @@ func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error // indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir // with wal.OpenAndReadLeafNodesFromCheckpointV6 func (b *RegisterBootstrap) indexCheckpointFileWorker() error { + b.log.Debug().Msg("started checkpoint index worker") // collect leaf nodes to batch index until the channel is closed batch := make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) for leafNode := range b.leafNodeChan { @@ -103,6 +105,7 @@ func (b *RegisterBootstrap) indexCheckpointFileWorker() error { // IndexCheckpointFile indexes the checkpoint file in the Dir provided func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error { g, _ := errgroup.WithContext(ctx) + b.log.Debug().Msg("indexing checkpoint file for pebble register store") for i := 0; i < pebbleBootstrapWorkerCount; i++ { g.Go(func() error { return b.indexCheckpointFileWorker() @@ -110,12 +113,12 @@ func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error { } err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log) if err != nil { - // error in reading a leaf node return fmt.Errorf("error reading leaf node: %w", err) } if err = g.Wait(); err != nil { return fmt.Errorf("failed to index checkpoint file: %w", err) } + b.log.Debug().Msg("checkpoint indexing complete") batch := b.db.NewBatch() defer batch.Close() // update heights atomically to prevent one getting populated without the other diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index f5fc58304f5..03a60369e8f 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -3,6 +3,7 @@ package pebble import ( "context" "encoding/binary" + "fmt" "io" "os" "path" @@ -144,7 +145,25 @@ func TestRegisterBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { func TestRegisterBootstrap_IndexCheckpointFile_CorruptedCheckpointFile(t *testing.T) { t.Parallel() - + rootHeight := uint64(666) + log := zerolog.New(io.Discard) + unittest.RunWithTempDir(t, func(dir string) { + tries, _ := largeTrieWithValidRegisterIDs(t) + checkpointFileName := "large-checkpoint-incomplete" + require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, checkpointFileName, log), "fail to store checkpoint") + // delete 2nd part of the file (2nd subtrie) + fileToDelete := path.Join(dir, fmt.Sprintf("%v.%03d", checkpointFileName, 2)) + err := os.RemoveAll(fileToDelete) + require.NoError(t, err) + cache := pebble.NewCache(1 << 20) + opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) + defer cache.Unref() + pb, _ := unittest.TempPebbleDBWithOpts(t, opts) + bootstrap, err := NewRegisterBootstrap(pb, checkpointFileName, rootHeight, log) + require.NoError(t, err) + err = bootstrap.IndexCheckpointFile(context.Background()) + require.ErrorIs(t, err, os.ErrNotExist) + }) } func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { From 8474efdefef265c2aa7b4e7427a71e277a3c85e6 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Thu, 28 Sep 2023 13:34:09 -0400 Subject: [PATCH 20/31] initial changes per feedback --- storage/pebble/bootstrap.go | 51 ++++++++++++++++---------------- storage/pebble/bootstrap_test.go | 38 +++++++++++++----------- storage/pebble/constants.go | 6 ++++ 3 files changed, 51 insertions(+), 44 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 0961b9d2848..663ec7977f0 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -38,10 +38,15 @@ func NewRegisterBootstrap( // key detected, attempt to run bootstrap on corrupt or already bootstrapped data return nil, fmt.Errorf("found latest key set on badger instance, cannot bootstrap populated DB") } + // populate first height + err = db.Set(firstHeightKey(), encodedUint64(rootHeight), nil) + if err != nil { + return nil, fmt.Errorf("could not index first height key to initialize: %w", err) + } return &RegisterBootstrap{ checkpointDir: checkpointDir, checkpointFileName: checkpointFileName, - log: log, + log: log.With().Str("module", "register_bootstrap").Logger(), db: db, leafNodeChan: make(chan *wal.LeafNode, checkpointLeafNodeBufSize), rootHeight: rootHeight, @@ -80,18 +85,23 @@ func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error // indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir // with wal.OpenAndReadLeafNodesFromCheckpointV6 -func (b *RegisterBootstrap) indexCheckpointFileWorker() error { - b.log.Debug().Msg("started checkpoint index worker") +func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error { + b.log.Info().Msg("started checkpoint index worker") // collect leaf nodes to batch index until the channel is closed batch := make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) for leafNode := range b.leafNodeChan { - batch = append(batch, leafNode) - if len(batch) >= pebbleBootstrapRegisterBatchLen { - err := b.batchIndexRegisters(batch) - if err != nil { - return fmt.Errorf("unable to index registers to pebble in batch: %w", err) + select { + case <-ctx.Done(): + return nil + default: + batch = append(batch, leafNode) + if len(batch) >= pebbleBootstrapRegisterBatchLen { + err := b.batchIndexRegisters(batch) + if err != nil { + return fmt.Errorf("unable to index registers to pebble in batch: %w", err) + } + batch = make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) } - batch = make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) } } // index the remaining registers if didn't reach a batch length. @@ -104,11 +114,11 @@ func (b *RegisterBootstrap) indexCheckpointFileWorker() error { // IndexCheckpointFile indexes the checkpoint file in the Dir provided func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error { - g, _ := errgroup.WithContext(ctx) - b.log.Debug().Msg("indexing checkpoint file for pebble register store") + g, gCtx := errgroup.WithContext(ctx) + b.log.Info().Msg("indexing checkpoint file for pebble register store") for i := 0; i < pebbleBootstrapWorkerCount; i++ { g.Go(func() error { - return b.indexCheckpointFileWorker() + return b.indexCheckpointFileWorker(gCtx) }) } err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log) @@ -118,21 +128,10 @@ func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error { if err = g.Wait(); err != nil { return fmt.Errorf("failed to index checkpoint file: %w", err) } - b.log.Debug().Msg("checkpoint indexing complete") - batch := b.db.NewBatch() - defer batch.Close() - // update heights atomically to prevent one getting populated without the other - err = batch.Set(firstHeightKey(), encodedUint64(b.rootHeight), nil) - if err != nil { - return fmt.Errorf("unable to add first height to batch: %w", err) - } - err = batch.Set(latestHeightKey(), encodedUint64(b.rootHeight), nil) - if err != nil { - return fmt.Errorf("unable to add latest height to batch: %w", err) - } - err = batch.Commit(pebble.Sync) + b.log.Info().Msg("checkpoint indexing complete") + err = b.db.Set(latestHeightKey(), encodedUint64(b.rootHeight), nil) if err != nil { - return fmt.Errorf("unable to index first and latest heights: %w", err) + return fmt.Errorf("could not index latest height: %w", err) } return nil } diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 03a60369e8f..38b498e83c9 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -23,22 +23,24 @@ import ( "github.com/onflow/flow-go/utils/unittest" ) +const defaultRegisterValue = byte('v') + func TestRegisterBootstrap_NewBootstrap(t *testing.T) { t.Parallel() - sampleDir := path.Join(unittest.TempDir(t), "checkpoint.checkpoint") - rootHeight := uint64(1) - log := zerolog.New(io.Discard) - cache := pebble.NewCache(1 << 20) - defer cache.Unref() - opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) - p, dir := unittest.TempPebbleDBWithOpts(t, opts) - // set heights - require.NoError(t, p.Set(firstHeightKey(), encodedUint64(rootHeight), nil)) - require.NoError(t, p.Set(latestHeightKey(), encodedUint64(rootHeight), nil)) - // errors if FirstHeight or LastHeight are populated - _, err := NewRegisterBootstrap(p, sampleDir, rootHeight, log) - require.ErrorContains(t, err, "cannot bootstrap populated DB") - require.NoError(t, os.RemoveAll(dir)) + unittest.RunWithTempDir(t, func(dir string) { + rootHeight := uint64(1) + log := zerolog.New(io.Discard) + cache := pebble.NewCache(1 << 20) + defer cache.Unref() + opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) + p, dir := unittest.TempPebbleDBWithOpts(t, opts) + // set heights + require.NoError(t, p.Set(firstHeightKey(), encodedUint64(rootHeight), nil)) + require.NoError(t, p.Set(latestHeightKey(), encodedUint64(rootHeight), nil)) + // errors if FirstHeight or LastHeight are populated + _, err := NewRegisterBootstrap(p, dir, rootHeight, log) + require.ErrorContains(t, err, "cannot bootstrap populated DB") + }) } func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) { @@ -71,7 +73,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) { for _, register := range registerIDs { val, err := reg.Get(*register, rootHeight) require.NoError(t, err) - require.Equal(t, val, []byte{uint8(0)}) + require.Equal(t, val, []byte{defaultRegisterValue}) } require.NoError(t, pb.Close()) @@ -194,7 +196,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { for _, register := range registerIDs { val, err := reg.Get(*register, rootHeight) require.NoError(t, err) - require.Equal(t, val, []byte{uint8(0)}) + require.Equal(t, val, []byte{defaultRegisterValue}) } require.NoError(t, pb.Close()) @@ -242,8 +244,8 @@ func randomRegisterPayloads(n uint16) []ledger.Payload { {Type: state.KeyPartOwner, Value: o}, {Type: state.KeyPartKey, Value: o}, }} - // values are always 0 for ease of testing/checking - v := ledger.Value{uint8(0)} + // values are always 'v' for ease of testing/checking + v := ledger.Value{defaultRegisterValue} pl := ledger.NewPayload(k, v) p = append(p, *pl) } diff --git a/storage/pebble/constants.go b/storage/pebble/constants.go index 09d8ebfe36f..2f866c89a48 100644 --- a/storage/pebble/constants.go +++ b/storage/pebble/constants.go @@ -3,10 +3,16 @@ package pebble import "github.com/onflow/flow-go/storage/pebble/registers" const ( + // checkpointLeafNodeBufSize is the batch size of leaf nodes being read from the checkpoint file, + // for use by wal.OpenAndReadLeafNodesFromCheckpointV6 checkpointLeafNodeBufSize = 1000 + // pebbleBootstrapRegisterBatchLen is the batch size of converted register values to be written to pebble by the + // register bootstrap process pebbleBootstrapRegisterBatchLen = 1000 + // pebbleBootstrapWorkerCount is the maximum number of concurrent goroutines that read and index + // checkpoint leaf nodes pebbleBootstrapWorkerCount = 10 // placeHolderHeight is an element of the height lookup keys of length HeightSuffixLen From cde601c30042e2b4596cb3c6beb472b75e0398c0 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Thu, 28 Sep 2023 13:42:30 -0400 Subject: [PATCH 21/31] changes after merge with master --- storage/pebble/bootstrap.go | 5 +++-- storage/pebble/bootstrap_test.go | 10 +++++----- storage/pebble/lookup.go | 16 ---------------- 3 files changed, 8 insertions(+), 23 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 663ec7977f0..dd144c7d3bb 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -6,6 +6,7 @@ import ( "path/filepath" "github.com/cockroachdb/pebble" + "github.com/onflow/flow-go/ledger/common/convert" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" @@ -55,7 +56,7 @@ func NewRegisterBootstrap( // batchIndexRegisters func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error { - b.log.Debug().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes ") + b.log.Debug().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes") batch := b.db.NewBatch() defer batch.Close() for _, register := range leafNodes { @@ -65,7 +66,7 @@ func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error return fmt.Errorf("could not get key from register payload: %w", err) } - registerID, err := keyToRegisterID(key) + registerID, err := convert.LedgerKeyToRegisterID(key) if err != nil { return fmt.Errorf("could not get register ID from key: %w", err) } diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 38b498e83c9..4dcc4177c5d 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -13,8 +13,8 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/require" - "github.com/onflow/flow-go/engine/execution/state" "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" "github.com/onflow/flow-go/ledger/common/testutils" "github.com/onflow/flow-go/ledger/complete/mtrie/trie" "github.com/onflow/flow-go/ledger/complete/wal" @@ -138,7 +138,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) - require.ErrorContains(t, err, "key not in expected format") + require.ErrorContains(t, err, "unexpected ledger key format") require.NoError(t, pb.Close()) require.NoError(t, os.RemoveAll(dbDir)) }) @@ -223,7 +223,7 @@ func trieWithValidRegisterIDs(t *testing.T, n uint16) ([]*trie.MTrie, []*flow.Re for _, payload := range payloads { key, err := payload.Key() require.NoError(t, err) - regID, err := keyToRegisterID(key) + regID, err := convert.LedgerKeyToRegisterID(key) require.NoError(t, err) resultRegisterIDs = append(resultRegisterIDs, ®ID) } @@ -241,8 +241,8 @@ func randomRegisterPayloads(n uint16) []ledger.Payload { o := make([]byte, 0, 8) o = binary.BigEndian.AppendUint16(o, n) k := ledger.Key{KeyParts: []ledger.KeyPart{ - {Type: state.KeyPartOwner, Value: o}, - {Type: state.KeyPartKey, Value: o}, + {Type: convert.KeyPartOwner, Value: o}, + {Type: convert.KeyPartKey, Value: o}, }} // values are always 'v' for ease of testing/checking v := ledger.Value{defaultRegisterValue} diff --git a/storage/pebble/lookup.go b/storage/pebble/lookup.go index 01c0285f642..b404ffbe52e 100644 --- a/storage/pebble/lookup.go +++ b/storage/pebble/lookup.go @@ -5,8 +5,6 @@ import ( "encoding/binary" "fmt" - "github.com/onflow/flow-go/engine/execution/state" - "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/storage/pebble/registers" ) @@ -130,17 +128,3 @@ func encodedUint64(height uint64) []byte { payload := make([]byte, 0, 8) return binary.BigEndian.AppendUint64(payload, height) } - -// keyToRegisterID converts a ledger key into a register ID. -func keyToRegisterID(key ledger.Key) (flow.RegisterID, error) { - if len(key.KeyParts) != 2 || - key.KeyParts[0].Type != state.KeyPartOwner || - key.KeyParts[1].Type != state.KeyPartKey { - return flow.RegisterID{}, fmt.Errorf("key not in expected format: %s", key.String()) - } - - return flow.RegisterID{ - Owner: string(key.KeyParts[0].Value), - Key: string(key.KeyParts[1].Value), - }, nil -} From b869d24d779aa1e5c64a2d2f09bd9a07b7b15a77 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Thu, 28 Sep 2023 14:17:42 -0400 Subject: [PATCH 22/31] lint --- storage/pebble/bootstrap.go | 3 +-- storage/pebble/bootstrap_test.go | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index dd144c7d3bb..853e5d0bfd7 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -6,10 +6,10 @@ import ( "path/filepath" "github.com/cockroachdb/pebble" - "github.com/onflow/flow-go/ledger/common/convert" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" + "github.com/onflow/flow-go/ledger/common/convert" "github.com/onflow/flow-go/ledger/complete/wal" ) @@ -39,7 +39,6 @@ func NewRegisterBootstrap( // key detected, attempt to run bootstrap on corrupt or already bootstrapped data return nil, fmt.Errorf("found latest key set on badger instance, cannot bootstrap populated DB") } - // populate first height err = db.Set(firstHeightKey(), encodedUint64(rootHeight), nil) if err != nil { return nil, fmt.Errorf("could not index first height key to initialize: %w", err) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 4dcc4177c5d..8b80e01bff0 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -33,12 +33,13 @@ func TestRegisterBootstrap_NewBootstrap(t *testing.T) { cache := pebble.NewCache(1 << 20) defer cache.Unref() opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) - p, dir := unittest.TempPebbleDBWithOpts(t, opts) + p, err := pebble.Open(dir, opts) + require.NoError(t, err) // set heights require.NoError(t, p.Set(firstHeightKey(), encodedUint64(rootHeight), nil)) require.NoError(t, p.Set(latestHeightKey(), encodedUint64(rootHeight), nil)) // errors if FirstHeight or LastHeight are populated - _, err := NewRegisterBootstrap(p, dir, rootHeight, log) + _, err = NewRegisterBootstrap(p, dir, rootHeight, log) require.ErrorContains(t, err, "cannot bootstrap populated DB") }) } From 673d08ad4121d7f3045fbdf6c42c324159d93bc2 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Thu, 28 Sep 2023 14:32:09 -0400 Subject: [PATCH 23/31] cleanup --- storage/pebble/bootstrap.go | 1 - 1 file changed, 1 deletion(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 853e5d0bfd7..daf2d04996d 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -53,7 +53,6 @@ func NewRegisterBootstrap( }, nil } -// batchIndexRegisters func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error { b.log.Debug().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes") batch := b.db.NewBatch() From e2ca59b7ded578c1ff29d4db4dbb53f7e9893540 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Fri, 29 Sep 2023 09:03:34 -0400 Subject: [PATCH 24/31] incorporate open DB interface --- storage/pebble/bootstrap.go | 23 ++++++++------- storage/pebble/bootstrap_test.go | 50 ++++++++++++-------------------- storage/pebble/open.go | 2 +- 3 files changed, 32 insertions(+), 43 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index daf2d04996d..3b4e8733eb3 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -33,16 +33,15 @@ func NewRegisterBootstrap( ) (*RegisterBootstrap, error) { // check for pre-populated heights, fail if it is populated // i.e. the IndexCheckpointFile function has already run for the db in this directory - checkpointDir, checkpointFileName := filepath.Split(checkpointFile) - _, _, err := db.Get(latestHeightKey()) - if err == nil { - // key detected, attempt to run bootstrap on corrupt or already bootstrapped data - return nil, fmt.Errorf("found latest key set on badger instance, cannot bootstrap populated DB") - } - err = db.Set(firstHeightKey(), encodedUint64(rootHeight), nil) + isBootstrapped, err := IsBootstrapped(db) if err != nil { - return nil, fmt.Errorf("could not index first height key to initialize: %w", err) + return nil, err } + if isBootstrapped { + // key detected, attempt to run bootstrap on corrupt or already bootstrapped data + return nil, fmt.Errorf("found latest key set on badger instance, DB is already bootstrapped") + } + checkpointDir, checkpointFileName := filepath.Split(checkpointFile) return &RegisterBootstrap{ checkpointDir: checkpointDir, checkpointFileName: checkpointFileName, @@ -113,7 +112,8 @@ func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error // IndexCheckpointFile indexes the checkpoint file in the Dir provided func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error { - g, gCtx := errgroup.WithContext(ctx) + cct, cancel := context.WithCancel(ctx) + g, gCtx := errgroup.WithContext(cct) b.log.Info().Msg("indexing checkpoint file for pebble register store") for i := 0; i < pebbleBootstrapWorkerCount; i++ { g.Go(func() error { @@ -122,13 +122,16 @@ func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error { } err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log) if err != nil { + cancel() return fmt.Errorf("error reading leaf node: %w", err) } if err = g.Wait(); err != nil { + cancel() return fmt.Errorf("failed to index checkpoint file: %w", err) } + cancel() b.log.Info().Msg("checkpoint indexing complete") - err = b.db.Set(latestHeightKey(), encodedUint64(b.rootHeight), nil) + err = initHeights(b.db, b.rootHeight) if err != nil { return fmt.Errorf("could not index latest height: %w", err) } diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 8b80e01bff0..7306a4038f0 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -10,17 +10,15 @@ import ( "testing" "github.com/cockroachdb/pebble" - "github.com/rs/zerolog" - "github.com/stretchr/testify/require" - "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" "github.com/onflow/flow-go/ledger/common/testutils" "github.com/onflow/flow-go/ledger/complete/mtrie/trie" "github.com/onflow/flow-go/ledger/complete/wal" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/storage/pebble/registers" "github.com/onflow/flow-go/utils/unittest" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" ) const defaultRegisterValue = byte('v') @@ -30,17 +28,13 @@ func TestRegisterBootstrap_NewBootstrap(t *testing.T) { unittest.RunWithTempDir(t, func(dir string) { rootHeight := uint64(1) log := zerolog.New(io.Discard) - cache := pebble.NewCache(1 << 20) - defer cache.Unref() - opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) - p, err := pebble.Open(dir, opts) + p, err := OpenRegisterPebbleDB(dir) require.NoError(t, err) // set heights - require.NoError(t, p.Set(firstHeightKey(), encodedUint64(rootHeight), nil)) - require.NoError(t, p.Set(latestHeightKey(), encodedUint64(rootHeight), nil)) + require.NoError(t, initHeights(p, rootHeight)) // errors if FirstHeight or LastHeight are populated _, err = NewRegisterBootstrap(p, dir, rootHeight, log) - require.ErrorContains(t, err, "cannot bootstrap populated DB") + require.ErrorContains(t, err, "DB is already bootstrapped") }) } @@ -53,11 +47,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) { fileName := "simple-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - - cache := pebble.NewCache(1 << 20) - opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) - defer cache.Unref() - pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) + pb, dbDir := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) @@ -91,10 +81,8 @@ func TestRegisterBootstrap_IndexCheckpointFile_Empty(t *testing.T) { fileName := "empty-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - cache := pebble.NewCache(1 << 20) - opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) - defer cache.Unref() - pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) + pb, dbDir := createPebbleForTest(t) + bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) @@ -131,11 +119,8 @@ func TestRegisterBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { require.NoErrorf(t, wal.StoreCheckpointV6Concurrently([]*trie.MTrie{trieWithInvalidEntry}, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - cache := pebble.NewCache(1 << 20) - opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) - defer cache.Unref() + pb, dbDir := createPebbleForTest(t) - pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) @@ -158,10 +143,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_CorruptedCheckpointFile(t *testin fileToDelete := path.Join(dir, fmt.Sprintf("%v.%03d", checkpointFileName, 2)) err := os.RemoveAll(fileToDelete) require.NoError(t, err) - cache := pebble.NewCache(1 << 20) - opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) - defer cache.Unref() - pb, _ := unittest.TempPebbleDBWithOpts(t, opts) + pb, _ := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFileName, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) @@ -178,10 +160,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { fileName := "large-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - cache := pebble.NewCache(1 << 20) - opts := DefaultPebbleOptions(cache, registers.NewMVCCComparer()) - defer cache.Unref() - pb, dbDir := unittest.TempPebbleDBWithOpts(t, opts) + pb, dbDir := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) @@ -260,3 +239,10 @@ func randomRegisterPaths(n uint16) []ledger.Path { } return p } + +func createPebbleForTest(t *testing.T) (*pebble.DB, string) { + dbDir := unittest.TempPebblePath(t) + pb, err := OpenRegisterPebbleDB(dbDir) + require.NoError(t, err) + return pb, dbDir +} diff --git a/storage/pebble/open.go b/storage/pebble/open.go index f36cbf2a5ac..a0d7ea6c0d5 100644 --- a/storage/pebble/open.go +++ b/storage/pebble/open.go @@ -33,7 +33,7 @@ func NewBootstrappedRegistersWithPath(dir string) (*Registers, *pebble.DB, error return registers, db, nil } -// openRegisterPebbleDB opens the database +// OpenRegisterPebbleDB opens the database func OpenRegisterPebbleDB(dir string) (*pebble.DB, error) { cache := pebble.NewCache(1 << 20) defer cache.Unref() From 15ffb8deba0449a4651157bdb57e3fe31e0c8f36 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Fri, 29 Sep 2023 09:08:01 -0400 Subject: [PATCH 25/31] lint --- storage/pebble/bootstrap_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 7306a4038f0..3b8a4126815 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -10,6 +10,8 @@ import ( "testing" "github.com/cockroachdb/pebble" + "github.com/rs/zerolog" + "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" "github.com/onflow/flow-go/ledger/common/testutils" @@ -17,7 +19,6 @@ import ( "github.com/onflow/flow-go/ledger/complete/wal" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" - "github.com/rs/zerolog" "github.com/stretchr/testify/require" ) From 199bfe05b055eeade7baa45d13fde87d47bcc09e Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Fri, 29 Sep 2023 09:09:30 -0400 Subject: [PATCH 26/31] lint --- storage/pebble/bootstrap_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 3b8a4126815..1fc010f4bcc 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -11,6 +11,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/rs/zerolog" + "github.com/stretchr/testify/require" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" @@ -19,7 +20,6 @@ import ( "github.com/onflow/flow-go/ledger/complete/wal" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" - "github.com/stretchr/testify/require" ) const defaultRegisterValue = byte('v') From 0423af7ced118a9afdc150bbc1e316844c2aec49 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Fri, 29 Sep 2023 09:17:29 -0400 Subject: [PATCH 27/31] defer cancel --- storage/pebble/bootstrap.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 3b4e8733eb3..88ff4eaf3c1 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -113,6 +113,7 @@ func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error // IndexCheckpointFile indexes the checkpoint file in the Dir provided func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error { cct, cancel := context.WithCancel(ctx) + defer cancel() g, gCtx := errgroup.WithContext(cct) b.log.Info().Msg("indexing checkpoint file for pebble register store") for i := 0; i < pebbleBootstrapWorkerCount; i++ { @@ -122,14 +123,11 @@ func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error { } err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log) if err != nil { - cancel() return fmt.Errorf("error reading leaf node: %w", err) } if err = g.Wait(); err != nil { - cancel() return fmt.Errorf("failed to index checkpoint file: %w", err) } - cancel() b.log.Info().Msg("checkpoint indexing complete") err = initHeights(b.db, b.rootHeight) if err != nil { From e32e912930c336067da454d124aed9959946625a Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Fri, 29 Sep 2023 11:01:44 -0400 Subject: [PATCH 28/31] add sentinel error for pre-bootstrapped pebble --- storage/pebble/bootstrap.go | 5 ++++- storage/pebble/bootstrap_test.go | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 88ff4eaf3c1..436653cab73 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -13,6 +13,9 @@ import ( "github.com/onflow/flow-go/ledger/complete/wal" ) +// ErrAlreadyBootstrapped is the sentinel error for an already bootstrapped pebble instance +var ErrAlreadyBootstrapped = fmt.Errorf("found latest key set on badger instance, DB is already bootstrapped") + type RegisterBootstrap struct { checkpointDir string checkpointFileName string @@ -39,7 +42,7 @@ func NewRegisterBootstrap( } if isBootstrapped { // key detected, attempt to run bootstrap on corrupt or already bootstrapped data - return nil, fmt.Errorf("found latest key set on badger instance, DB is already bootstrapped") + return nil, ErrAlreadyBootstrapped } checkpointDir, checkpointFileName := filepath.Split(checkpointFile) return &RegisterBootstrap{ diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 1fc010f4bcc..2692f2c82b6 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -35,7 +35,7 @@ func TestRegisterBootstrap_NewBootstrap(t *testing.T) { require.NoError(t, initHeights(p, rootHeight)) // errors if FirstHeight or LastHeight are populated _, err = NewRegisterBootstrap(p, dir, rootHeight, log) - require.ErrorContains(t, err, "DB is already bootstrapped") + require.ErrorIs(t, err, ErrAlreadyBootstrapped) }) } From 94db5b2366eb0299e6abe44c3cba006fda58a8c8 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Fri, 29 Sep 2023 11:56:42 -0400 Subject: [PATCH 29/31] add cleanup fn for pebble --- storage/pebble/bootstrap.go | 3 ++- storage/pebble/bootstrap_test.go | 29 +++++++++++------------------ 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index 436653cab73..ecb18ec6b43 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -2,6 +2,7 @@ package pebble import ( "context" + "errors" "fmt" "path/filepath" @@ -14,7 +15,7 @@ import ( ) // ErrAlreadyBootstrapped is the sentinel error for an already bootstrapped pebble instance -var ErrAlreadyBootstrapped = fmt.Errorf("found latest key set on badger instance, DB is already bootstrapped") +var ErrAlreadyBootstrapped = errors.New("found latest key set on badger instance, DB is already bootstrapped") type RegisterBootstrap struct { checkpointDir string diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 2692f2c82b6..caaea0ff3f0 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -48,7 +48,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) { fileName := "simple-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - pb, dbDir := createPebbleForTest(t) + pb := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) @@ -67,9 +67,6 @@ func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) { require.NoError(t, err) require.Equal(t, val, []byte{defaultRegisterValue}) } - - require.NoError(t, pb.Close()) - require.NoError(t, os.RemoveAll(dbDir)) }) } @@ -82,7 +79,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_Empty(t *testing.T) { fileName := "empty-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - pb, dbDir := createPebbleForTest(t) + pb := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) @@ -95,9 +92,6 @@ func TestRegisterBootstrap_IndexCheckpointFile_Empty(t *testing.T) { require.Equal(t, reg.LatestHeight(), rootHeight) require.Equal(t, reg.FirstHeight(), rootHeight) - - require.NoError(t, pb.Close()) - require.NoError(t, os.RemoveAll(dbDir)) }) } @@ -120,14 +114,12 @@ func TestRegisterBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { require.NoErrorf(t, wal.StoreCheckpointV6Concurrently([]*trie.MTrie{trieWithInvalidEntry}, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - pb, dbDir := createPebbleForTest(t) + pb := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) require.ErrorContains(t, err, "unexpected ledger key format") - require.NoError(t, pb.Close()) - require.NoError(t, os.RemoveAll(dbDir)) }) } @@ -144,7 +136,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_CorruptedCheckpointFile(t *testin fileToDelete := path.Join(dir, fmt.Sprintf("%v.%03d", checkpointFileName, 2)) err := os.RemoveAll(fileToDelete) require.NoError(t, err) - pb, _ := createPebbleForTest(t) + pb := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFileName, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) @@ -161,7 +153,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { fileName := "large-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - pb, dbDir := createPebbleForTest(t) + pb := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) @@ -179,9 +171,6 @@ func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { require.NoError(t, err) require.Equal(t, val, []byte{defaultRegisterValue}) } - - require.NoError(t, pb.Close()) - require.NoError(t, os.RemoveAll(dbDir)) }) } @@ -241,9 +230,13 @@ func randomRegisterPaths(n uint16) []ledger.Path { return p } -func createPebbleForTest(t *testing.T) (*pebble.DB, string) { +func createPebbleForTest(t *testing.T) *pebble.DB { dbDir := unittest.TempPebblePath(t) pb, err := OpenRegisterPebbleDB(dbDir) require.NoError(t, err) - return pb, dbDir + t.Cleanup(func() { + require.NoError(t, pb.Close()) + require.NoError(t, os.RemoveAll(dbDir)) + }) + return pb } From 47f4ad9780023b2b686555cc80a6a27eb05cdd19 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Fri, 29 Sep 2023 12:33:45 -0400 Subject: [PATCH 30/31] undo cleanup it's closing the pebble instance before usage --- storage/pebble/bootstrap_test.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index caaea0ff3f0..2692f2c82b6 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -48,7 +48,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) { fileName := "simple-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - pb := createPebbleForTest(t) + pb, dbDir := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) @@ -67,6 +67,9 @@ func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) { require.NoError(t, err) require.Equal(t, val, []byte{defaultRegisterValue}) } + + require.NoError(t, pb.Close()) + require.NoError(t, os.RemoveAll(dbDir)) }) } @@ -79,7 +82,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_Empty(t *testing.T) { fileName := "empty-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - pb := createPebbleForTest(t) + pb, dbDir := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) @@ -92,6 +95,9 @@ func TestRegisterBootstrap_IndexCheckpointFile_Empty(t *testing.T) { require.Equal(t, reg.LatestHeight(), rootHeight) require.Equal(t, reg.FirstHeight(), rootHeight) + + require.NoError(t, pb.Close()) + require.NoError(t, os.RemoveAll(dbDir)) }) } @@ -114,12 +120,14 @@ func TestRegisterBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) { require.NoErrorf(t, wal.StoreCheckpointV6Concurrently([]*trie.MTrie{trieWithInvalidEntry}, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - pb := createPebbleForTest(t) + pb, dbDir := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) require.ErrorContains(t, err, "unexpected ledger key format") + require.NoError(t, pb.Close()) + require.NoError(t, os.RemoveAll(dbDir)) }) } @@ -136,7 +144,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_CorruptedCheckpointFile(t *testin fileToDelete := path.Join(dir, fmt.Sprintf("%v.%03d", checkpointFileName, 2)) err := os.RemoveAll(fileToDelete) require.NoError(t, err) - pb := createPebbleForTest(t) + pb, _ := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFileName, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) @@ -153,7 +161,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { fileName := "large-checkpoint" require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint") checkpointFile := path.Join(dir, fileName) - pb := createPebbleForTest(t) + pb, dbDir := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) @@ -171,6 +179,9 @@ func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) { require.NoError(t, err) require.Equal(t, val, []byte{defaultRegisterValue}) } + + require.NoError(t, pb.Close()) + require.NoError(t, os.RemoveAll(dbDir)) }) } @@ -230,13 +241,9 @@ func randomRegisterPaths(n uint16) []ledger.Path { return p } -func createPebbleForTest(t *testing.T) *pebble.DB { +func createPebbleForTest(t *testing.T) (*pebble.DB, string) { dbDir := unittest.TempPebblePath(t) pb, err := OpenRegisterPebbleDB(dbDir) require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, pb.Close()) - require.NoError(t, os.RemoveAll(dbDir)) - }) - return pb + return pb, dbDir } From 0532094cf5d606ecb07398b4d7914580de4dc651 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Fri, 29 Sep 2023 12:40:12 -0400 Subject: [PATCH 31/31] add final rmdir --- storage/pebble/bootstrap_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/pebble/bootstrap_test.go b/storage/pebble/bootstrap_test.go index 2692f2c82b6..ad2bcfba0ad 100644 --- a/storage/pebble/bootstrap_test.go +++ b/storage/pebble/bootstrap_test.go @@ -144,11 +144,12 @@ func TestRegisterBootstrap_IndexCheckpointFile_CorruptedCheckpointFile(t *testin fileToDelete := path.Join(dir, fmt.Sprintf("%v.%03d", checkpointFileName, 2)) err := os.RemoveAll(fileToDelete) require.NoError(t, err) - pb, _ := createPebbleForTest(t) + pb, dbDir := createPebbleForTest(t) bootstrap, err := NewRegisterBootstrap(pb, checkpointFileName, rootHeight, log) require.NoError(t, err) err = bootstrap.IndexCheckpointFile(context.Background()) require.ErrorIs(t, err, os.ErrNotExist) + require.NoError(t, os.RemoveAll(dbDir)) }) }