From bf91944ed89b9d608f99fc785d654befbb946c0b Mon Sep 17 00:00:00 2001 From: ReyisaRuby Date: Tue, 7 Jan 2025 18:09:44 +0800 Subject: [PATCH] txndb support sql write/double read (#259) * support sql write/double read Signed-off-by: ReyisaRuby * fix Signed-off-by: ReyisaRuby * add lock Signed-off-by: ReyisaRuby * add metrics Signed-off-by: ReyisaRuby * fix Signed-off-by: ReyisaRuby * address the comment Signed-off-by: ReyisaRuby --------- Signed-off-by: ReyisaRuby --- config/storage.go | 3 + core/startup/startup.go | 11 +- core/txdb/txdb.go | 269 ++++++++++++++++++++++++++++++++-------- metrics/metrics.go | 1 + metrics/txdb_metrics.go | 22 ++++ 5 files changed, 248 insertions(+), 58 deletions(-) create mode 100644 metrics/txdb_metrics.go diff --git a/config/storage.go b/config/storage.go index bd693fe..db730b4 100644 --- a/config/storage.go +++ b/config/storage.go @@ -7,6 +7,9 @@ type KVconf struct { Path string `toml:"path"` // distributed kvdb Hosts []string `toml:"hosts"` + + UseSQlDbConf bool `toml:"use_sql_db"` + SQLDbConf SqlDbConf `toml:"sql_db"` } type SqlDbConf struct { diff --git a/core/startup/startup.go b/core/startup/startup.go index 9b0fb04..c182f87 100644 --- a/core/startup/startup.go +++ b/core/startup/startup.go @@ -1,8 +1,12 @@ package startup import ( + "os" + "path" + "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" + "github.com/yu-org/yu/apps/synchronizer" "github.com/yu-org/yu/config" "github.com/yu-org/yu/core/blockchain" @@ -17,8 +21,6 @@ import ( "github.com/yu-org/yu/infra/p2p" "github.com/yu-org/yu/infra/storage/kv" "github.com/yu-org/yu/utils/codec" - "os" - "path" ) var ( @@ -68,7 +70,10 @@ func InitKernel(cfg *config.KernelConf) *kernel.Kernel { } if TxnDB == nil { - TxnDB = txdb.NewTxDB(cfg.NodeType, kvdb) + TxnDB, err = txdb.NewTxDB(cfg.NodeType, kvdb, &cfg.KVDB) + if err != nil { + logrus.Fatal("init kvdb error: ", err) + } } if Chain == nil { Chain = blockchain.NewBlockChain(cfg.NodeType, &cfg.BlockChain, TxnDB) diff --git a/core/txdb/txdb.go b/core/txdb/txdb.go index 5bd103c..2b33a0e 100644 --- a/core/txdb/txdb.go +++ b/core/txdb/txdb.go @@ -1,10 +1,16 @@ package txdb import ( + "sync" + "github.com/sirupsen/logrus" + . "github.com/yu-org/yu/common" + "github.com/yu-org/yu/config" . "github.com/yu-org/yu/core/types" "github.com/yu-org/yu/infra/storage/kv" + "github.com/yu-org/yu/infra/storage/sql" + "github.com/yu-org/yu/metrics" ) const ( @@ -13,24 +19,38 @@ const ( ) type TxDB struct { - nodeType int - txnKV kv.KV - receiptKV kv.KV + nodeType int + + txnKV *txnkvdb + receiptKV *receipttxnkvdb + + enableUseSql bool + db sql.SqlDB } -func NewTxDB(nodeTyp int, kvdb kv.Kvdb) ItxDB { - return &TxDB{ - nodeType: nodeTyp, - txnKV: kvdb.New(Txns), - receiptKV: kvdb.New(Results), - } +type txnkvdb struct { + sync.RWMutex + txnKV kv.KV } -func (bb *TxDB) GetTxn(txnHash Hash) (*SignedTxn, error) { - if bb.nodeType == LightNode { - return nil, nil +const ( + txnType = "txn" + receiptType = "receipt" + successStatus = "success" + errStatus = "err" +) + +func getStatusValue(err error) string { + if err == nil { + return successStatus } - byt, err := bb.txnKV.Get(txnHash.Bytes()) + return errStatus +} + +func (t *txnkvdb) GetTxn(txnHash Hash) (txn *SignedTxn, err error) { + t.RLock() + defer t.RUnlock() + byt, err := t.txnKV.Get(txnHash.Bytes()) if err != nil { return nil, err } @@ -40,24 +60,95 @@ func (bb *TxDB) GetTxn(txnHash Hash) (*SignedTxn, error) { return DecodeSignedTxn(byt) } -func (bb *TxDB) GetTxns(txnHashes []Hash) ([]*SignedTxn, error) { - if bb.nodeType == LightNode { - return nil, nil +func (t *txnkvdb) ExistTxn(txnHash Hash) bool { + t.RLock() + defer t.RUnlock() + return t.txnKV.Exist(txnHash.Bytes()) +} + +func (t *txnkvdb) SetTxns(txns []*SignedTxn) (err error) { + t.Lock() + defer t.Unlock() + kvtx, err := t.txnKV.NewKvTxn() + if err != nil { + return err } - txns := make([]*SignedTxn, 0) - for _, txnHash := range txnHashes { - byt, err := bb.txnKV.Get(txnHash.Bytes()) + for _, txn := range txns { + txbyt, err := txn.Encode() + if err != nil { + logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err) + return err + } + err = kvtx.Set(txn.TxnHash.Bytes(), txbyt) if err != nil { + return err + } + } + return kvtx.Commit() +} + +type TxnDBSchema struct { + Type string `gorm:"type:varchar(10)"` + Key string `gorm:"primaryKey;type:text"` + Value string `gorm:"type:text"` +} + +func (TxnDBSchema) TableName() string { + return "txndb" +} + +func NewTxDB(nodeTyp int, kvdb kv.Kvdb, kvdbConf *config.KVconf) (ItxDB, error) { + txdb := &TxDB{ + nodeType: nodeTyp, + txnKV: &txnkvdb{txnKV: kvdb.New(Txns)}, + receiptKV: &receipttxnkvdb{receiptKV: kvdb.New(Results)}, + } + if kvdbConf != nil && kvdbConf.UseSQlDbConf { + db, err := sql.NewSqlDB(&kvdbConf.SQLDbConf) + if err != nil { + return nil, err + } + txdb.db = db + txdb.enableUseSql = true + if err := txdb.db.AutoMigrate(&TxnDBSchema{}); err != nil { return nil, err } - if byt == nil { - continue + } + return txdb, nil +} + +func (bb *TxDB) GetTxn(txnHash Hash) (stxn *SignedTxn, err error) { + defer func() { + metrics.TxnDBCounter.WithLabelValues(txnType, "getTxn", getStatusValue(err)).Inc() + }() + if bb.nodeType == LightNode { + return nil, nil + } + if bb.enableUseSql { + var records []TxnDBSchema + err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "txn", txnHash.String()).Find(&records).Error + // find result in sql database + if err == nil && len(records) > 0 { + return DecodeSignedTxn([]byte(records[0].Value)) } - signedTxn, err := DecodeSignedTxn(byt) + } + return bb.txnKV.GetTxn(txnHash) +} + +func (bb *TxDB) GetTxns(txnHashes []Hash) (stxns []*SignedTxn, err error) { + defer func() { + metrics.TxnDBCounter.WithLabelValues(txnType, "getTxn", getStatusValue(err)).Inc() + }() + if bb.nodeType == LightNode { + return nil, nil + } + txns := make([]*SignedTxn, 0) + for _, txnHash := range txnHashes { + result, err := bb.GetTxn(txnHash) if err != nil { return nil, err } - txns = append(txns, signedTxn) + txns = append(txns, result) } return txns, nil } @@ -66,61 +157,99 @@ func (bb *TxDB) ExistTxn(txnHash Hash) bool { if bb.nodeType == LightNode { return false } - return bb.txnKV.Exist(txnHash.Bytes()) + if bb.enableUseSql { + var records []TxnDBSchema + err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "txn", txnHash.String()).Find(&records).Error + if err == nil && len(records) > 0 { + return true + } + } + return bb.txnKV.ExistTxn(txnHash) } -func (bb *TxDB) SetTxns(txns []*SignedTxn) error { +func (bb *TxDB) SetTxns(txns []*SignedTxn) (err error) { + defer func() { + metrics.TxnDBCounter.WithLabelValues(txnType, "setTxns", getStatusValue(err)).Inc() + }() if bb.nodeType == LightNode { return nil } - kvtx, err := bb.txnKV.NewKvTxn() - if err != nil { - return err - } - for _, txn := range txns { - txbyt, err := txn.Encode() - if err != nil { - logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err) - return err - } - err = kvtx.Set(txn.TxnHash.Bytes(), txbyt) - if err != nil { - return err + if bb.enableUseSql { + for _, txn := range txns { + txbyt, err := txn.Encode() + if err != nil { + logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err) + return err + } + if err := bb.db.Db().Exec("insert into txndb (type,key,value) values (?,?,?)", "txn", txn.TxnHash.String(), string(txbyt)).Error; err != nil { + logrus.Errorf("Insert TxDB.SetTxns tx(%s) failed: %v", txn.TxnHash.String(), err) + return err + } } + return nil } - return kvtx.Commit() + return bb.txnKV.SetTxns(txns) } -func (bb *TxDB) SetReceipts(receipts map[Hash]*Receipt) error { - kvtx, err := bb.receiptKV.NewKvTxn() - if err != nil { - return err +func (bb *TxDB) SetReceipts(receipts map[Hash]*Receipt) (err error) { + defer func() { + metrics.TxnDBCounter.WithLabelValues(receiptType, "setReceipts", getStatusValue(err)).Inc() + }() + if bb.enableUseSql { + for txHash, receipt := range receipts { + if err := bb.SetReceipt(txHash, receipt); err != nil { + return err + } + } + return nil } + return bb.receiptKV.SetReceipts(receipts) +} - for txHash, receipt := range receipts { +func (bb *TxDB) SetReceipt(txHash Hash, receipt *Receipt) (err error) { + defer func() { + metrics.TxnDBCounter.WithLabelValues(receiptType, "setReceipt", getStatusValue(err)).Inc() + }() + if bb.enableUseSql { byt, err := receipt.Encode() if err != nil { return err } - err = kvtx.Set(txHash.Bytes(), byt) - if err != nil { + if err := bb.db.Db().Exec("insert into txndb (type,key,value) values (?,?,?)", "receipt", txHash.String(), string(byt)).Error; err != nil { return err } + return nil } - - return kvtx.Commit() + return bb.receiptKV.SetReceipt(txHash, receipt) } -func (bb *TxDB) SetReceipt(txHash Hash, receipt *Receipt) error { - byt, err := receipt.Encode() - if err != nil { - return err +func (bb *TxDB) GetReceipt(txHash Hash) (rec *Receipt, err error) { + defer func() { + metrics.TxnDBCounter.WithLabelValues(receiptType, "getReceipt", getStatusValue(err)).Inc() + }() + if bb.enableUseSql { + var records []TxnDBSchema + err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "receipt", txHash.String()).Find(&records).Error + if err == nil && len(records) > 0 { + receipt := new(Receipt) + err = receipt.Decode([]byte(records[0].Value)) + if err == nil { + return receipt, nil + } + } } - return bb.receiptKV.Set(txHash.Bytes(), byt) + return bb.receiptKV.GetReceipt(txHash) } -func (bb *TxDB) GetReceipt(txHash Hash) (*Receipt, error) { - byt, err := bb.receiptKV.Get(txHash.Bytes()) +type receipttxnkvdb struct { + sync.RWMutex + receiptKV kv.KV +} + +func (r *receipttxnkvdb) GetReceipt(txHash Hash) (*Receipt, error) { + r.RLock() + defer r.RUnlock() + byt, err := r.receiptKV.Get(txHash.Bytes()) if err != nil { logrus.Errorf("TxDB.GetReceipt(%s), failed: %s, error: %v", txHash.String(), string(byt), err) return nil, err @@ -135,3 +264,33 @@ func (bb *TxDB) GetReceipt(txHash Hash) (*Receipt, error) { } return receipt, err } + +func (r *receipttxnkvdb) SetReceipt(txHash Hash, receipt *Receipt) error { + r.Lock() + defer r.Unlock() + byt, err := receipt.Encode() + if err != nil { + return err + } + return r.receiptKV.Set(txHash.Bytes(), byt) +} + +func (r *receipttxnkvdb) SetReceipts(receipts map[Hash]*Receipt) error { + r.Lock() + defer r.Unlock() + kvtx, err := r.receiptKV.NewKvTxn() + if err != nil { + return err + } + for txHash, receipt := range receipts { + byt, err := receipt.Encode() + if err != nil { + return err + } + err = kvtx.Set(txHash.Bytes(), byt) + if err != nil { + return err + } + } + return kvtx.Commit() +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 2e92a81..9ae268c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -89,4 +89,5 @@ func init() { prometheus.MustRegister(TxpoolSizeGauge) // prometheus.MustRegister(AppendBlockDuration, StartBlockDuration, EndBlockDuration, FinalizeBlockDuration) prometheus.MustRegister(StateCommitDuration) + initTxnDBMetrics() } diff --git a/metrics/txdb_metrics.go b/metrics/txdb_metrics.go new file mode 100644 index 0000000..a818f5f --- /dev/null +++ b/metrics/txdb_metrics.go @@ -0,0 +1,22 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +const ( + TypeLbl = "type" + OpLabel = "op" + StatusLbl = "status" +) + +var ( + TxnDBCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "yu", + Subsystem: "txndb", + Name: "op_counter", + Help: "Counter of txnDB", + }, []string{TypeLbl, OpLabel, StatusLbl}) +) + +func initTxnDBMetrics() { + prometheus.MustRegister(TxnDBCounter) +}