Skip to content

Commit

Permalink
txndb support sql write/double read (#259)
Browse files Browse the repository at this point in the history
* support sql write/double read

Signed-off-by: ReyisaRuby <yisa@reddio.com>

* fix

Signed-off-by: ReyisaRuby <yisa@reddio.com>

* add lock

Signed-off-by: ReyisaRuby <yisa@reddio.com>

* add metrics

Signed-off-by: ReyisaRuby <yisa@reddio.com>

* fix

Signed-off-by: ReyisaRuby <yisa@reddio.com>

* address the comment

Signed-off-by: ReyisaRuby <yisa@reddio.com>

---------

Signed-off-by: ReyisaRuby <yisa@reddio.com>
  • Loading branch information
ReyisaRuby authored Jan 7, 2025
1 parent 0246c0a commit bf91944
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 58 deletions.
3 changes: 3 additions & 0 deletions config/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ type KVconf struct {
Path string `toml:"path"`
// distributed kvdb
Hosts []string `toml:"hosts"`

UseSQlDbConf bool `toml:"use_sql_db"`
SQLDbConf SqlDbConf `toml:"sql_db"`
}

type SqlDbConf struct {
Expand Down
11 changes: 8 additions & 3 deletions core/startup/startup.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package startup

import (
"os"
"path"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"

"github.com/yu-org/yu/apps/synchronizer"
"github.com/yu-org/yu/config"
"github.com/yu-org/yu/core/blockchain"
Expand All @@ -17,8 +21,6 @@ import (
"github.com/yu-org/yu/infra/p2p"
"github.com/yu-org/yu/infra/storage/kv"
"github.com/yu-org/yu/utils/codec"
"os"
"path"
)

var (
Expand Down Expand Up @@ -68,7 +70,10 @@ func InitKernel(cfg *config.KernelConf) *kernel.Kernel {
}

if TxnDB == nil {
TxnDB = txdb.NewTxDB(cfg.NodeType, kvdb)
TxnDB, err = txdb.NewTxDB(cfg.NodeType, kvdb, &cfg.KVDB)
if err != nil {
logrus.Fatal("init kvdb error: ", err)
}
}
if Chain == nil {
Chain = blockchain.NewBlockChain(cfg.NodeType, &cfg.BlockChain, TxnDB)
Expand Down
269 changes: 214 additions & 55 deletions core/txdb/txdb.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package txdb

import (
"sync"

"github.com/sirupsen/logrus"

. "github.com/yu-org/yu/common"
"github.com/yu-org/yu/config"
. "github.com/yu-org/yu/core/types"
"github.com/yu-org/yu/infra/storage/kv"
"github.com/yu-org/yu/infra/storage/sql"
"github.com/yu-org/yu/metrics"
)

const (
Expand All @@ -13,24 +19,38 @@ const (
)

type TxDB struct {
nodeType int
txnKV kv.KV
receiptKV kv.KV
nodeType int

txnKV *txnkvdb
receiptKV *receipttxnkvdb

enableUseSql bool
db sql.SqlDB
}

func NewTxDB(nodeTyp int, kvdb kv.Kvdb) ItxDB {
return &TxDB{
nodeType: nodeTyp,
txnKV: kvdb.New(Txns),
receiptKV: kvdb.New(Results),
}
type txnkvdb struct {
sync.RWMutex
txnKV kv.KV
}

func (bb *TxDB) GetTxn(txnHash Hash) (*SignedTxn, error) {
if bb.nodeType == LightNode {
return nil, nil
const (
txnType = "txn"
receiptType = "receipt"
successStatus = "success"
errStatus = "err"
)

func getStatusValue(err error) string {
if err == nil {
return successStatus
}
byt, err := bb.txnKV.Get(txnHash.Bytes())
return errStatus
}

func (t *txnkvdb) GetTxn(txnHash Hash) (txn *SignedTxn, err error) {
t.RLock()
defer t.RUnlock()
byt, err := t.txnKV.Get(txnHash.Bytes())
if err != nil {
return nil, err
}
Expand All @@ -40,24 +60,95 @@ func (bb *TxDB) GetTxn(txnHash Hash) (*SignedTxn, error) {
return DecodeSignedTxn(byt)
}

func (bb *TxDB) GetTxns(txnHashes []Hash) ([]*SignedTxn, error) {
if bb.nodeType == LightNode {
return nil, nil
func (t *txnkvdb) ExistTxn(txnHash Hash) bool {
t.RLock()
defer t.RUnlock()
return t.txnKV.Exist(txnHash.Bytes())
}

func (t *txnkvdb) SetTxns(txns []*SignedTxn) (err error) {
t.Lock()
defer t.Unlock()
kvtx, err := t.txnKV.NewKvTxn()
if err != nil {
return err
}
txns := make([]*SignedTxn, 0)
for _, txnHash := range txnHashes {
byt, err := bb.txnKV.Get(txnHash.Bytes())
for _, txn := range txns {
txbyt, err := txn.Encode()
if err != nil {
logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
err = kvtx.Set(txn.TxnHash.Bytes(), txbyt)
if err != nil {
return err
}
}
return kvtx.Commit()
}

type TxnDBSchema struct {
Type string `gorm:"type:varchar(10)"`
Key string `gorm:"primaryKey;type:text"`
Value string `gorm:"type:text"`
}

func (TxnDBSchema) TableName() string {
return "txndb"
}

func NewTxDB(nodeTyp int, kvdb kv.Kvdb, kvdbConf *config.KVconf) (ItxDB, error) {
txdb := &TxDB{
nodeType: nodeTyp,
txnKV: &txnkvdb{txnKV: kvdb.New(Txns)},
receiptKV: &receipttxnkvdb{receiptKV: kvdb.New(Results)},
}
if kvdbConf != nil && kvdbConf.UseSQlDbConf {
db, err := sql.NewSqlDB(&kvdbConf.SQLDbConf)
if err != nil {
return nil, err
}
txdb.db = db
txdb.enableUseSql = true
if err := txdb.db.AutoMigrate(&TxnDBSchema{}); err != nil {
return nil, err
}
if byt == nil {
continue
}
return txdb, nil
}

func (bb *TxDB) GetTxn(txnHash Hash) (stxn *SignedTxn, err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(txnType, "getTxn", getStatusValue(err)).Inc()
}()
if bb.nodeType == LightNode {
return nil, nil
}
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "txn", txnHash.String()).Find(&records).Error
// find result in sql database
if err == nil && len(records) > 0 {
return DecodeSignedTxn([]byte(records[0].Value))
}
signedTxn, err := DecodeSignedTxn(byt)
}
return bb.txnKV.GetTxn(txnHash)
}

func (bb *TxDB) GetTxns(txnHashes []Hash) (stxns []*SignedTxn, err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(txnType, "getTxn", getStatusValue(err)).Inc()
}()
if bb.nodeType == LightNode {
return nil, nil
}
txns := make([]*SignedTxn, 0)
for _, txnHash := range txnHashes {
result, err := bb.GetTxn(txnHash)
if err != nil {
return nil, err
}
txns = append(txns, signedTxn)
txns = append(txns, result)
}
return txns, nil
}
Expand All @@ -66,61 +157,99 @@ func (bb *TxDB) ExistTxn(txnHash Hash) bool {
if bb.nodeType == LightNode {
return false
}
return bb.txnKV.Exist(txnHash.Bytes())
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "txn", txnHash.String()).Find(&records).Error
if err == nil && len(records) > 0 {
return true
}
}
return bb.txnKV.ExistTxn(txnHash)
}

func (bb *TxDB) SetTxns(txns []*SignedTxn) error {
func (bb *TxDB) SetTxns(txns []*SignedTxn) (err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(txnType, "setTxns", getStatusValue(err)).Inc()
}()
if bb.nodeType == LightNode {
return nil
}
kvtx, err := bb.txnKV.NewKvTxn()
if err != nil {
return err
}
for _, txn := range txns {
txbyt, err := txn.Encode()
if err != nil {
logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
err = kvtx.Set(txn.TxnHash.Bytes(), txbyt)
if err != nil {
return err
if bb.enableUseSql {
for _, txn := range txns {
txbyt, err := txn.Encode()
if err != nil {
logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
if err := bb.db.Db().Exec("insert into txndb (type,key,value) values (?,?,?)", "txn", txn.TxnHash.String(), string(txbyt)).Error; err != nil {
logrus.Errorf("Insert TxDB.SetTxns tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
}
return nil
}
return kvtx.Commit()
return bb.txnKV.SetTxns(txns)
}

func (bb *TxDB) SetReceipts(receipts map[Hash]*Receipt) error {
kvtx, err := bb.receiptKV.NewKvTxn()
if err != nil {
return err
func (bb *TxDB) SetReceipts(receipts map[Hash]*Receipt) (err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(receiptType, "setReceipts", getStatusValue(err)).Inc()
}()
if bb.enableUseSql {
for txHash, receipt := range receipts {
if err := bb.SetReceipt(txHash, receipt); err != nil {
return err
}
}
return nil
}
return bb.receiptKV.SetReceipts(receipts)
}

for txHash, receipt := range receipts {
func (bb *TxDB) SetReceipt(txHash Hash, receipt *Receipt) (err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(receiptType, "setReceipt", getStatusValue(err)).Inc()
}()
if bb.enableUseSql {
byt, err := receipt.Encode()
if err != nil {
return err
}
err = kvtx.Set(txHash.Bytes(), byt)
if err != nil {
if err := bb.db.Db().Exec("insert into txndb (type,key,value) values (?,?,?)", "receipt", txHash.String(), string(byt)).Error; err != nil {
return err
}
return nil
}

return kvtx.Commit()
return bb.receiptKV.SetReceipt(txHash, receipt)
}

func (bb *TxDB) SetReceipt(txHash Hash, receipt *Receipt) error {
byt, err := receipt.Encode()
if err != nil {
return err
func (bb *TxDB) GetReceipt(txHash Hash) (rec *Receipt, err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(receiptType, "getReceipt", getStatusValue(err)).Inc()
}()
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "receipt", txHash.String()).Find(&records).Error
if err == nil && len(records) > 0 {
receipt := new(Receipt)
err = receipt.Decode([]byte(records[0].Value))
if err == nil {
return receipt, nil
}
}
}
return bb.receiptKV.Set(txHash.Bytes(), byt)
return bb.receiptKV.GetReceipt(txHash)
}

func (bb *TxDB) GetReceipt(txHash Hash) (*Receipt, error) {
byt, err := bb.receiptKV.Get(txHash.Bytes())
type receipttxnkvdb struct {
sync.RWMutex
receiptKV kv.KV
}

func (r *receipttxnkvdb) GetReceipt(txHash Hash) (*Receipt, error) {
r.RLock()
defer r.RUnlock()
byt, err := r.receiptKV.Get(txHash.Bytes())
if err != nil {
logrus.Errorf("TxDB.GetReceipt(%s), failed: %s, error: %v", txHash.String(), string(byt), err)
return nil, err
Expand All @@ -135,3 +264,33 @@ func (bb *TxDB) GetReceipt(txHash Hash) (*Receipt, error) {
}
return receipt, err
}

func (r *receipttxnkvdb) SetReceipt(txHash Hash, receipt *Receipt) error {
r.Lock()
defer r.Unlock()
byt, err := receipt.Encode()
if err != nil {
return err
}
return r.receiptKV.Set(txHash.Bytes(), byt)
}

func (r *receipttxnkvdb) SetReceipts(receipts map[Hash]*Receipt) error {
r.Lock()
defer r.Unlock()
kvtx, err := r.receiptKV.NewKvTxn()
if err != nil {
return err
}
for txHash, receipt := range receipts {
byt, err := receipt.Encode()
if err != nil {
return err
}
err = kvtx.Set(txHash.Bytes(), byt)
if err != nil {
return err
}
}
return kvtx.Commit()
}
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,5 @@ func init() {
prometheus.MustRegister(TxpoolSizeGauge)
// prometheus.MustRegister(AppendBlockDuration, StartBlockDuration, EndBlockDuration, FinalizeBlockDuration)
prometheus.MustRegister(StateCommitDuration)
initTxnDBMetrics()
}
Loading

0 comments on commit bf91944

Please sign in to comment.