Skip to content

Commit

Permalink
Problem: async commit queue size not configurable
Browse files Browse the repository at this point in the history
Solution:
- change the bool option to int

rename

WaitAsyncCommit
  • Loading branch information
yihuang committed May 17, 2023
1 parent 7f20b83 commit ace73e8
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 52 deletions.
5 changes: 3 additions & 2 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ type MemIAVLConfig struct {
// ZeroCopy defines if the memiavl should return slices pointing to mmap-ed buffers directly (zero-copy),
// the zero-copied slices must not be retained beyond current block's execution.
ZeroCopy bool `mapstructure:"zero-copy"`
// AsyncCommit defines if the memiavl should commit asynchronously, this greatly improve block catching-up performance.
AsyncCommit bool `mapstructure:"async-commit"`
// AsyncCommitBuffer defines the size of asynchronous commit queue, this greatly improve block catching-up
// performance, -1 means synchronous commit.
AsyncCommitBuffer int `mapstructure:"async-commit-buffer"`
}

func DefaultMemIAVLConfig() MemIAVLConfig {
Expand Down
5 changes: 3 additions & 2 deletions app/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ enable = {{ .MemIAVL.Enable }}
# the zero-copied slices must not be retained beyond current block's execution.
zero-copy = {{ .MemIAVL.ZeroCopy }}
# AsyncCommit defines if the memiavl should commit asynchronously, this greatly improve block catching-up performance.
async-commit = {{ .MemIAVL.AsyncCommit }}
# AsyncCommitBuffer defines the size of asynchronous commit queue, this greatly improve block catching-up
# performance, -1 means synchronous commit.
async-commit-buffer = {{ .MemIAVL.AsyncCommitBuffer }}
`
8 changes: 4 additions & 4 deletions app/memiavl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import (
)

const (
FlagMemIAVL = "memiavl.enable"
FlagAsyncCommit = "memiavl.async-commit"
FlagZeroCopy = "memiavl.zero-copy"
FlagMemIAVL = "memiavl.enable"
FlagAsyncCommitBuffer = "memiavl.async-commit-buffer"
FlagZeroCopy = "memiavl.zero-copy"
)

func SetupMemIAVL(logger log.Logger, homePath string, appOpts servertypes.AppOptions, baseAppOptions []func(*baseapp.BaseApp)) []func(*baseapp.BaseApp) {
if cast.ToBool(appOpts.Get(FlagMemIAVL)) {
// cms must be overridden before the other options, because they may use the cms,
// make sure the cms aren't be overridden by the other options later on.
cms := rootmulti.NewStore(filepath.Join(homePath, "data", "memiavl.db"), logger)
cms.SetAsyncCommit(cast.ToBool(appOpts.Get(FlagAsyncCommit)))
cms.SetAsyncCommitBuffer(cast.ToInt(appOpts.Get(FlagAsyncCommitBuffer)))
cms.SetZeroCopy(cast.ToBool(appOpts.Get(FlagZeroCopy)))
baseAppOptions = append([]func(*baseapp.BaseApp){setCMS(cms)}, baseAppOptions...)
}
Expand Down
91 changes: 56 additions & 35 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type DB struct {
pruneSnapshotLock sync.Mutex

// invariant: the LastIndex always match the current version of MultiTree
wal *wal.Log
walChan chan *walEntry
walQuit chan error
wal *wal.Log
walChanSize int
walChan chan *walEntry
walQuit chan error

// pending store upgrades, will be written into WAL in next Commit call
pendingUpgrades []*TreeNameUpgrade
Expand All @@ -63,8 +64,8 @@ type Options struct {
SnapshotKeepRecent uint32
// load the target version instead of latest version
TargetVersion uint32
// Write WAL asynchronously, it's ok in blockchain case because we can always replay the raw blocks.
AsyncCommit bool
// Buffer size for the asynchronous commit queue, -1 means synchronous commit
AsyncCommitBuffer int
// ZeroCopy if true, the get and iterator methods could return a slice pointing to mmaped blob files.
ZeroCopy bool
}
Expand Down Expand Up @@ -97,36 +98,11 @@ func Load(dir string, opts Options) (*DB, error) {
return nil, err
}

var (
walChan chan *walEntry
walQuit chan error
)
if opts.AsyncCommit {
walChan = make(chan *walEntry, 100)
walQuit = make(chan error)
go func() {
defer close(walQuit)

for entry := range walChan {
bz, err := entry.data.Marshal()
if err != nil {
walQuit <- err
return
}
if err := wal.Write(entry.index, bz); err != nil {
walQuit <- err
return
}
}
}()
}

db := &DB{
MultiTree: *mtree,
dir: dir,
wal: wal,
walChan: walChan,
walQuit: walQuit,
walChanSize: opts.AsyncCommitBuffer,
snapshotKeepRecent: opts.SnapshotKeepRecent,
}

Expand Down Expand Up @@ -179,13 +155,13 @@ func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error {
// checkAsyncTasks checks the status of background tasks non-blocking-ly and process the result
func (db *DB) checkAsyncTasks() error {
return errors.Join(
db.checkAsyncWAL(),
db.checkAsyncCommit(),
db.checkBackgroundSnapshotRewrite(),
)
}

// checkAsyncWAL check the quit signal of async wal writing
func (db *DB) checkAsyncWAL() error {
// checkAsyncCommit check the quit signal of async wal writing
func (db *DB) checkAsyncCommit() error {
select {
case err := <-db.walQuit:
// async wal writing failed, we need to abort the state machine
Expand Down Expand Up @@ -269,7 +245,11 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
Changesets: changeSets,
Upgrades: db.pendingUpgrades,
}}
if db.walChan != nil {
if db.walChanSize >= 0 {
if db.walChan == nil {
db.initAsyncCommit()
}

// async wal writing
db.walChan <- &entry
} else {
Expand All @@ -288,6 +268,47 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
return hash, v, nil
}

func (db *DB) initAsyncCommit() {
walChan := make(chan *walEntry, db.walChanSize)
walQuit := make(chan error)

go func() {
defer close(walQuit)

for entry := range walChan {
bz, err := entry.data.Marshal()
if err != nil {
walQuit <- err
return
}
if err := db.wal.Write(entry.index, bz); err != nil {
walQuit <- err
return
}
}
}()

db.walChan = walChan
db.walQuit = walQuit
}

// WaitAsyncCommit waits for the completion of async commit
func (db *DB) WaitAsyncCommit() error {
db.mtx.Lock()
defer db.mtx.Unlock()

if db.walChan == nil {
return nil
}

close(db.walChan)
err := <-db.walQuit

db.walChan = nil
db.walQuit = nil
return err
}

func (db *DB) Copy() *DB {
db.mtx.Lock()
defer db.mtx.Unlock()
Expand Down
22 changes: 13 additions & 9 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type Store struct {

interBlockCache types.MultiStorePersistentCache

asyncCommit bool
zeroCopy bool
asyncCommitBuffer int
zeroCopy bool
}

func NewStore(dir string, logger log.Logger) *Store {
Expand Down Expand Up @@ -92,6 +92,10 @@ func (rs *Store) Commit() types.CommitID {
return rs.lastCommitInfo.CommitID()
}

func (rs *Store) WaitAsyncCommit() error {
return rs.db.WaitAsyncCommit()
}

// Implements interface Committer
func (rs *Store) LastCommitID() types.CommitID {
return rs.lastCommitInfo.CommitID()
Expand Down Expand Up @@ -266,11 +270,11 @@ func (rs *Store) LoadVersionAndUpgrade(version int64, upgrades *types.StoreUpgra
}
}
db, err := memiavl.Load(rs.dir, memiavl.Options{
CreateIfMissing: true,
InitialStores: initialStores,
TargetVersion: uint32(version),
AsyncCommit: rs.asyncCommit,
ZeroCopy: rs.zeroCopy,
CreateIfMissing: true,
InitialStores: initialStores,
TargetVersion: uint32(version),
AsyncCommitBuffer: rs.asyncCommitBuffer,
ZeroCopy: rs.zeroCopy,
})
if err != nil {
return errors.Wrapf(err, "fail to load memiavl at %s", rs.dir)
Expand Down Expand Up @@ -384,8 +388,8 @@ func (rs *Store) SetIAVLDisableFastNode(disable bool) {
func (rs *Store) SetLazyLoading(lazyLoading bool) {
}

func (rs *Store) SetAsyncCommit(async bool) {
rs.asyncCommit = async
func (rs *Store) SetAsyncCommitBuffer(size int) {
rs.asyncCommitBuffer = size
}

func (rs *Store) SetZeroCopy(zeroCopy bool) {
Expand Down

0 comments on commit ace73e8

Please sign in to comment.