Skip to content

Commit

Permalink
concurrent commit poc
Browse files Browse the repository at this point in the history
- stanadlone metrics server to collect metrics during comet replay
- fix usage of sqlite metadata kv
  • Loading branch information
kocubinski committed Nov 30, 2023
1 parent 54d461d commit ec67f4e
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 83 deletions.
11 changes: 10 additions & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/store"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/rootmulti"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
Expand Down Expand Up @@ -405,6 +406,7 @@ func (app *BaseApp) setCheckState(header tmproto.Header) {
// Commit.
func (app *BaseApp) setDeliverState(header tmproto.Header) {
ms := app.cms.CacheMultiStore()
ms = ms.(cachemulti.Store).SetConcurrentCommit(1)
app.deliverState = &state{
ms: ms,
ctx: sdk.NewContext(ms, header, false, app.logger),
Expand Down Expand Up @@ -759,9 +761,16 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, mode runTxMode) (*s
)

if handler := app.msgServiceRouter.Handler(msg); handler != nil {
eventMsgName = sdk.MsgTypeURL(msg)
//if eventMsgName == "/cosmwasm.wasm.v1.MsgExecuteContract" {
//fmt.Println("MsgExecuteContract")
//gaskv.StartLogging()
//}
// ADR 031 request type routing
msgResult, err = handler(ctx, msg)
eventMsgName = sdk.MsgTypeURL(msg)
//if eventMsgName == "/ibc.applications.transfer.v1.MsgTransfer" {
//gaskv.StopLogging()
//}
} else if legacyMsg, ok := msg.(legacytx.LegacyMsg); ok {
// legacy sdk.Msg routing
// Assuming that the app developer has migrated all their Msgs to
Expand Down
18 changes: 14 additions & 4 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
return err
}

// create a standalone metrics server to collect metrics during block replay (if applicable)
//stadaloneMetrics, err := telemetry.NewStandaloneMetrics(config.Telemetry)
//if err != nil {
// return err
//}
//metricsCtx, cancel := context.WithCancel(context.Background())
//stadaloneMetrics.StartServer(metricsCtx, ":1317")

genDocProvider := node.DefaultGenesisDocProviderFunc(cfg)
tmNode, err := node.NewNode(
cfg,
Expand All @@ -274,8 +282,10 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
ctx.Logger,
)
if err != nil {
//cancel()
return err
}
//cancel()

ctx.Logger.Debug("initialization: tmNode created")
if err := tmNode.Start(); err != nil {
Expand Down Expand Up @@ -419,14 +429,14 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
}

defer func() {
if tmNode.IsRunning() {
_ = tmNode.Stop()
}

if cpuProfileCleanup != nil {
cpuProfileCleanup()
}

if tmNode.IsRunning() {
_ = tmNode.Stop()
}

if apiSrv != nil {
_ = apiSrv.Close()
}
Expand Down
5 changes: 4 additions & 1 deletion server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ func InterceptConfigsPreRunHandler(cmd *cobra.Command, customAppConfigTemplate s

var logWriter io.Writer
if strings.ToLower(serverCtx.Viper.GetString(flags.FlagLogFormat)) == tmcfg.LogFormatPlain {
logWriter = zerolog.ConsoleWriter{Out: os.Stderr}
logWriter = zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: time.Stamp,
}
} else {
logWriter = os.Stderr
}
Expand Down
50 changes: 47 additions & 3 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package cachemulti
import (
"fmt"
"io"
"time"

dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
)

//----------------------------------------
Expand All @@ -26,11 +28,27 @@ type Store struct {
traceWriter io.Writer
traceContext types.TraceContext

listeners map[types.StoreKey][]types.WriteListener
listeners map[types.StoreKey][]types.WriteListener
concurrentCommit int
}

var _ types.CacheMultiStore = Store{}

type concurrentCommitResult struct {
}

var (
concurrencyLimit = 10
commitLock = make(chan struct{}, concurrencyLimit)
concurrentCommitCh = make(chan *concurrentCommitResult, concurrencyLimit)
)

func init() {
for i := 0; i < concurrencyLimit; i++ {
commitLock <- struct{}{}
}
}

// NewFromKVStore creates a new Store object from a mapping of store keys to
// CacheWrapper objects and a KVStore as the database. Each CacheWrapper store
// is a branched store.
Expand Down Expand Up @@ -134,11 +152,37 @@ func (cms Store) GetStoreType() types.StoreType {
return types.StoreTypeMulti
}

func (cms Store) SetConcurrentCommit(concurrentCommit int) types.CacheMultiStore {
cms.concurrentCommit = concurrentCommit
return cms
}

// Write calls Write on each underlying store.
func (cms Store) Write() {
if cms.concurrentCommit == -1 || cms.concurrentCommit == 1 {
defer telemetry.MeasureSince(time.Now(), "store", "cachemulti", "write")
}
cms.db.Write()
for _, store := range cms.stores {
store.Write()
if cms.concurrentCommit == 1 {
var i int
for _, store := range cms.stores {
//fmt.Println("cachemulti: commitLock", i)
i++
go func(s types.CacheWrap) {
<-commitLock
s.Write()
concurrentCommitCh <- &concurrentCommitResult{}
commitLock <- struct{}{}
}(store)
}
for ; i > 0; i-- {
<-concurrentCommitCh
}
//fmt.Println("cachemulti: concurrent write done")
} else {
for _, store := range cms.stores {
store.Write()
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions store/gaskv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ func (gs *Store) Get(key []byte) (value []byte) {
// TODO overflow-safe math?
gs.gasMeter.ConsumeGas(gs.gasConfig.ReadCostPerByte*types.Gas(len(key)), types.GasReadPerByteDesc)
debugLog("Get", "PerByteKey", gs.gasConfig.ReadCostPerByte*types.Gas(len(key)))
if DebugLogging.Get() == 1 {
fmt.Printf("GasKVStore DebugLog Get %d*%d=%d len(key)=%d\n",
gs.gasConfig.ReadCostPerByte,
types.Gas(len(key)),
gs.gasConfig.ReadCostPerByte*types.Gas(len(key)),
len(key),
)
if gs.gasConfig.ReadCostPerByte*types.Gas(len(key)) == 63 || gs.gasConfig.ReadCostPerByte*types.Gas(len(key)) == 60 {
fmt.Printf("GasKVStore DebugLog key=%s\n", key)
if gs.gasConfig.ReadCostPerByte*types.Gas(len(value)) == 147 {
fmt.Println("found it")
}
}
}
gs.gasMeter.ConsumeGas(gs.gasConfig.ReadCostPerByte*types.Gas(len(value)), types.GasReadPerByteDesc)
debugLog("Get", "PerByteValue", gs.gasConfig.ReadCostPerByte*types.Gas(len(value)))

Expand Down Expand Up @@ -174,6 +188,7 @@ func (gi *gasIterator) Valid() bool {
// in the iterator. It incurs a flat gas cost for seeking and a variable gas
// cost based on the current value's length if the iterator is valid.
func (gi *gasIterator) Next() {
defer telemetry.MeasureSince(time.Now(), "store", "gaskv", "iterator", "next")
gi.consumeSeekGas()
gi.parent.Next()
}
Expand Down
1 change: 1 addition & 0 deletions store/iavl/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (st *Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []typ

// Implements types.KVStore.
func (st *Store) Set(key, value []byte) {
defer telemetry.MeasureSince(time.Now(), "store", "iavl", "set")
types.AssertValidKey(key)
types.AssertValidValue(value)
st.tree.Set(key, value)
Expand Down
29 changes: 24 additions & 5 deletions store/iavl_v2/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,33 @@ func LoadStoreWithInitialVersion(v2RootPath string, key types.StoreKey, id types
// i.e. not the happy path.
path := filepath.Join(v2RootPath, key.Name())
pool := iavl.NewNodePool()
fmt.Println("LoadStoreWithInitialVersion path:", path)
sql, err := iavl.NewSqliteDb(pool, iavl.SqliteDbOptions{Path: path})
sqlOpts := iavl.SqliteDbOptions{Path: path}
var err error
sqlOpts.MmapSize, err = sqlOpts.EstimateMmapSize()
if err != nil {
return nil, fmt.Errorf("failed to estimate mmap size for sqlite db path=%s: %w", path, err)
}
sql, err := iavl.NewSqliteDb(pool, sqlOpts)
if err != nil {
return nil, fmt.Errorf("failed to open sqlite db path=%s: %w", path, err)
}

tree := iavl.NewTree(sql, pool, iavl.TreeOptions{StateStorage: true})
//err = tree.LoadSnapshot(id.Version)
err = tree.LoadVersion(id.Version)
tree := iavl.NewTree(sql, pool, iavl.TreeOptions{
StateStorage: true,
MetricsProxy: &telemetry.GlobalMetricProxy{},
HeightFilter: 1,
})
if key.Name() == "ibc" {
err = tree.LoadVersion(id.Version)
} else {
err = tree.LoadVersion(id.Version)
}
if err != nil {
return nil, err
}
if err = sql.WarmLeaves(); err != nil {
return nil, err
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -127,6 +145,7 @@ func (s *Store) Has(key []byte) bool {
}

func (s *Store) Set(key, value []byte) {
defer telemetry.MeasureSince(time.Now(), "store", "iavl", "set")
types.AssertValidKey(key)
types.AssertValidValue(value)
_, err := s.Tree.Set(key, value)
Expand Down
Loading

0 comments on commit ec67f4e

Please sign in to comment.