Skip to content

Commit

Permalink
Applied PR suggestions regarding cache size
Browse files Browse the repository at this point in the history
  • Loading branch information
durkmurder committed Dec 7, 2023
1 parent 6c81ef2 commit 337c066
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 11 deletions.
3 changes: 2 additions & 1 deletion cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,8 @@ func (fnb *FlowNodeBuilder) initStorage() error {
setups := bstorage.NewEpochSetups(fnb.Metrics.Cache, fnb.DB)
epochCommits := bstorage.NewEpochCommits(fnb.Metrics.Cache, fnb.DB)
commits := bstorage.NewCommits(fnb.Metrics.Cache, fnb.DB)
protocolState := bstorage.NewProtocolState(fnb.Metrics.Cache, setups, epochCommits, fnb.DB, bstorage.DefaultCacheSize)
protocolState := bstorage.NewProtocolState(fnb.Metrics.Cache, setups, epochCommits, fnb.DB,
bstorage.DefaultProtocolStateCacheSize, bstorage.DefaultProtocolStateByBlockIDCacheSize)
versionBeacons := bstorage.NewVersionBeacons(fnb.DB)

fnb.Storage = Storage{
Expand Down
3 changes: 2 additions & 1 deletion consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ func createNode(
qcsDB := storage.NewQuorumCertificates(metricsCollector, db, storage.DefaultCacheSize)
setupsDB := storage.NewEpochSetups(metricsCollector, db)
commitsDB := storage.NewEpochCommits(metricsCollector, db)
protocolStateDB := storage.NewProtocolState(metricsCollector, setupsDB, commitsDB, db, storage.DefaultCacheSize)
protocolStateDB := storage.NewProtocolState(metricsCollector, setupsDB, commitsDB, db,
storage.DefaultProtocolStateCacheSize, storage.DefaultProtocolStateByBlockIDCacheSize)
versionBeaconDB := storage.NewVersionBeacons(db)
protocolStateEvents := events.NewDistributor()

Expand Down
3 changes: 2 additions & 1 deletion integration/testnet/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ func (c *Container) OpenState() (*state.State, error) {
qcs := storage.NewQuorumCertificates(metrics, db, storage.DefaultCacheSize)
setups := storage.NewEpochSetups(metrics, db)
commits := storage.NewEpochCommits(metrics, db)
protocolState := storage.NewProtocolState(metrics, setups, commits, db, storage.DefaultCacheSize)
protocolState := storage.NewProtocolState(metrics, setups, commits, db,
storage.DefaultProtocolStateCacheSize, storage.DefaultProtocolStateByBlockIDCacheSize)
versionBeacons := storage.NewVersionBeacons(db)

return state.OpenState(
Expand Down
3 changes: 2 additions & 1 deletion storage/badger/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func InitAll(metrics module.CacheMetrics, db *badger.DB) *storage.All {
qcs := NewQuorumCertificates(metrics, db, DefaultCacheSize)
setups := NewEpochSetups(metrics, db)
epochCommits := NewEpochCommits(metrics, db)
protocolState := NewProtocolState(metrics, setups, epochCommits, db, DefaultCacheSize)
protocolState := NewProtocolState(metrics, setups, epochCommits, db,
DefaultProtocolStateCacheSize, DefaultProtocolStateByBlockIDCacheSize)
versionBeacons := NewVersionBeacons(db)

commits := NewCommits(metrics, db)
Expand Down
16 changes: 13 additions & 3 deletions storage/badger/protocol_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ import (
"github.com/onflow/flow-go/storage/badger/transaction"
)

// DefaultProtocolStateCacheSize is the default size for primary protocol state cache.
// Minimally, we have 3 entries per epoch (one on epoch Switchover, one on receiving the Epoch Setup and one when seeing the Epoch Commit event).
// Lets be generous and assume we have 20 different Protocol States per epoch.
var DefaultProtocolStateCacheSize uint = 20

// DefaultProtocolStateByBlockIDCacheSize is the default value for secondary byBlockIdCache.
// We want to be able to cover a broad interval of views without cache misses, so we use a bigger value.
var DefaultProtocolStateByBlockIDCacheSize uint = 1000

// ProtocolState implements persistent storage for storing Protocol States.
// Protocol state uses an embedded cache without storing capabilities(store happens on first retrieval) to avoid unnecessary
// operations and to speed up access to frequently used Protocol State.
Expand Down Expand Up @@ -59,7 +68,8 @@ func NewProtocolState(collector module.CacheMetrics,
epochSetups storage.EpochSetups,
epochCommits storage.EpochCommits,
db *badger.DB,
cacheSize uint,
stateCacheSize uint,
stateByBlockIDCacheSize uint,
) *ProtocolState {
retrieveByProtocolStateID := func(protocolStateID flow.Identifier) func(tx *badger.Txn) (*flow.RichProtocolStateEntry, error) {
var protocolStateEntry flow.ProtocolStateEntry
Expand Down Expand Up @@ -100,11 +110,11 @@ func NewProtocolState(collector module.CacheMetrics,
return &ProtocolState{
db: db,
cache: newCache[flow.Identifier, *flow.RichProtocolStateEntry](collector, metrics.ResourceProtocolState,
withLimit[flow.Identifier, *flow.RichProtocolStateEntry](cacheSize),
withLimit[flow.Identifier, *flow.RichProtocolStateEntry](stateCacheSize),
withStore(noopStore[flow.Identifier, *flow.RichProtocolStateEntry]),
withRetrieve(retrieveByProtocolStateID)),
byBlockIdCache: newCache[flow.Identifier, flow.Identifier](collector, metrics.ResourceProtocolStateByBlockID,
withLimit[flow.Identifier, flow.Identifier](cacheSize),
withLimit[flow.Identifier, flow.Identifier](stateByBlockIDCacheSize),
withStore(storeByBlockID),
withRetrieve(retrieveByBlockID)),
}
Expand Down
8 changes: 4 additions & 4 deletions storage/badger/protocol_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestProtocolStateStorage(t *testing.T) {

setups := NewEpochSetups(metrics, db)
commits := NewEpochCommits(metrics, db)
store := NewProtocolState(metrics, setups, commits, db, DefaultCacheSize)
store := NewProtocolState(metrics, setups, commits, db, DefaultProtocolStateCacheSize, DefaultProtocolStateByBlockIDCacheSize)

expected := unittest.ProtocolStateFixture(unittest.WithNextEpochProtocolState())
protocolStateID := expected.ID()
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestProtocolStateStoreInvalidProtocolState(t *testing.T) {
metrics := metrics.NewNoopCollector()
setups := NewEpochSetups(metrics, db)
commits := NewEpochCommits(metrics, db)
store := NewProtocolState(metrics, setups, commits, db, DefaultCacheSize)
store := NewProtocolState(metrics, setups, commits, db, DefaultProtocolStateCacheSize, DefaultProtocolStateByBlockIDCacheSize)
invalid := unittest.ProtocolStateFixture().ProtocolStateEntry
// swap first and second elements to break canonical order
invalid.CurrentEpoch.ActiveIdentities[0], invalid.CurrentEpoch.ActiveIdentities[1] = invalid.CurrentEpoch.ActiveIdentities[1], invalid.CurrentEpoch.ActiveIdentities[0]
Expand All @@ -99,7 +99,7 @@ func TestProtocolStateMergeParticipants(t *testing.T) {

setups := NewEpochSetups(metrics, db)
commits := NewEpochCommits(metrics, db)
store := NewProtocolState(metrics, setups, commits, db, DefaultCacheSize)
store := NewProtocolState(metrics, setups, commits, db, DefaultProtocolStateCacheSize, DefaultProtocolStateByBlockIDCacheSize)

stateEntry := unittest.ProtocolStateFixture()
// change address of participant in current epoch, so we can distinguish it from the one in previous epoch
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestProtocolStateRootSnapshot(t *testing.T) {

setups := NewEpochSetups(metrics, db)
commits := NewEpochCommits(metrics, db)
store := NewProtocolState(metrics, setups, commits, db, DefaultCacheSize)
store := NewProtocolState(metrics, setups, commits, db, DefaultProtocolStateCacheSize, DefaultProtocolStateByBlockIDCacheSize)
expected := unittest.RootProtocolStateFixture()

protocolStateID := expected.ID()
Expand Down

0 comments on commit 337c066

Please sign in to comment.