Skip to content

Commit

Permalink
fix txndb read (#264)
Browse files Browse the repository at this point in the history
* fix_read

* fix

Signed-off-by: ReyisaRuby <yisa@reddio.com>

* fix

Signed-off-by: ReyisaRuby <yisa@reddio.com>

---------

Signed-off-by: ReyisaRuby <yisa@reddio.com>
  • Loading branch information
ReyisaRuby authored Jan 9, 2025
1 parent b54f504 commit ddec2fe
Showing 1 changed file with 11 additions and 81 deletions.
92 changes: 11 additions & 81 deletions core/txdb/txdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/yu-org/yu/config"
. "github.com/yu-org/yu/core/types"
"github.com/yu-org/yu/infra/storage/kv"
"github.com/yu-org/yu/infra/storage/sql"
"github.com/yu-org/yu/metrics"
)

Expand All @@ -23,9 +22,6 @@ type TxDB struct {

txnKV *txnkvdb
receiptKV *receipttxnkvdb

enableUseSql bool
db sql.SqlDB
}

type txnkvdb struct {
Expand Down Expand Up @@ -112,37 +108,16 @@ func NewTxDB(nodeTyp int, kvdb kv.Kvdb, kvdbConf *config.KVconf) (ItxDB, error)
txnKV: &txnkvdb{txnKV: kvdb.New(Txns)},
receiptKV: &receipttxnkvdb{receiptKV: kvdb.New(Results)},
}
if kvdbConf != nil && kvdbConf.UseSQlDbConf {
db, err := sql.NewSqlDB(&kvdbConf.SQLDbConf)
if err != nil {
return nil, err
}
txdb.db = db
txdb.enableUseSql = true
if err := txdb.db.AutoMigrate(&TxnDBSchema{}); err != nil {
return nil, err
}
}
return txdb, nil
}

func (bb *TxDB) GetTxn(txnHash Hash) (stxn *SignedTxn, err error) {
if bb.nodeType == LightNode {
return nil, nil
}
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and hash_key = ?", "txn", txnHash.String()).Find(&records).Error
// find result in sql database
if err == nil && len(records) > 0 {
res, err := DecodeSignedTxn(records[0].Value)
metrics.TxnDBCounter.WithLabelValues(txnType, sqlSourceType, "getTxn", getStatusValue(err)).Inc()
return res, err
}
}
res, err := bb.txnKV.GetTxn(txnHash)
r, err := bb.txnKV.GetTxn(txnHash)
metrics.TxnDBCounter.WithLabelValues(txnType, kvSourceType, "getTxn", getStatusValue(err)).Inc()
return res, err
return r, err
}

func (bb *TxDB) GetTxns(txnHashes []Hash) (stxns []*SignedTxn, err error) {
Expand All @@ -167,83 +142,38 @@ func (bb *TxDB) ExistTxn(txnHash Hash) bool {
if bb.nodeType == LightNode {
return false
}
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and hash_key = ?", "txn", txnHash.String()).Find(&records).Error
if err == nil && len(records) > 0 {
return true
}
}
return bb.txnKV.ExistTxn(txnHash)
find := bb.txnKV.ExistTxn(txnHash)
return find
}

func (bb *TxDB) SetTxns(txns []*SignedTxn) (err error) {
if bb.nodeType == LightNode {
return nil
}
defer func() {
metrics.TxnDBCounter.WithLabelValues(txnType, getSourceTypeValue(bb.enableUseSql), "setTxns", getStatusValue(err)).Inc()
metrics.TxnDBCounter.WithLabelValues(txnType, getSourceTypeValue(false), "setTxns", getStatusValue(err)).Inc()
}()
if bb.enableUseSql {
for _, txn := range txns {
txbyt, err := txn.Encode()
if err != nil {
logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
if err := bb.db.Db().Exec("insert into txndb (type, hash_key, value) values (?,?,?)", "txn", txn.TxnHash.String(), txbyt).Error; err != nil {
logrus.Errorf("Insert TxDB.SetTxns tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
}
return nil
}
return bb.txnKV.SetTxns(txns)
}

func (bb *TxDB) SetReceipts(receipts map[Hash]*Receipt) (err error) {
if bb.enableUseSql {
for txHash, receipt := range receipts {
if err := bb.SetReceipt(txHash, receipt); err != nil {
return err
}
}
return nil
}
defer func() {
metrics.TxnDBCounter.WithLabelValues(receiptType, getSourceTypeValue(false), "setReceipts", getStatusValue(err)).Inc()
}()
return bb.receiptKV.SetReceipts(receipts)
}

func (bb *TxDB) SetReceipt(txHash Hash, receipt *Receipt) (err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(receiptType, getSourceTypeValue(bb.enableUseSql), "setReceipt", getStatusValue(err)).Inc()
metrics.TxnDBCounter.WithLabelValues(receiptType, getSourceTypeValue(false), "setReceipt", getStatusValue(err)).Inc()
}()
if bb.enableUseSql {
byt, err := receipt.Encode()
if err != nil {
return err
}
if err := bb.db.Db().Exec("insert into txndb (type, hash_key, value) values (?,?,?)", "receipt", txHash.String(), byt).Error; err != nil {
return err
}
return nil
}
return bb.receiptKV.SetReceipt(txHash, receipt)
}

func (bb *TxDB) GetReceipt(txHash Hash) (rec *Receipt, err error) {
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and hash_key = ?", "receipt", txHash.String()).Find(&records).Error
if err == nil && len(records) > 0 {
receipt := new(Receipt)
err = receipt.Decode(records[0].Value)
metrics.TxnDBCounter.WithLabelValues(receiptType, sqlSourceType, "getReceipt", getStatusValue(err)).Inc()
return receipt, err
}
}
res, err := bb.receiptKV.GetReceipt(txHash)
r, err := bb.receiptKV.GetReceipt(txHash)
metrics.TxnDBCounter.WithLabelValues(receiptType, kvSourceType, "getReceipt", getStatusValue(err)).Inc()
return res, err
return r, err
}

type receipttxnkvdb struct {
Expand Down

0 comments on commit ddec2fe

Please sign in to comment.