Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txndb support sql write/double read #259

Merged
merged 6 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ type KVconf struct {
Path string `toml:"path"`
// distributed kvdb
Hosts []string `toml:"hosts"`

UseSQlDbConf bool `toml:"use_sql_db"`
SQLDbConf SqlDbConf `toml:"sql_db"`
}

type SqlDbConf struct {
Expand Down
9 changes: 6 additions & 3 deletions core/startup/startup.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package startup

import (
"os"
"path"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"

"github.com/yu-org/yu/apps/synchronizer"
"github.com/yu-org/yu/config"
"github.com/yu-org/yu/core/blockchain"
Expand All @@ -17,8 +21,6 @@ import (
"github.com/yu-org/yu/infra/p2p"
"github.com/yu-org/yu/infra/storage/kv"
"github.com/yu-org/yu/utils/codec"
"os"
"path"
)

var (
Expand Down Expand Up @@ -68,7 +70,8 @@ func InitKernel(cfg *config.KernelConf) *kernel.Kernel {
}

if TxnDB == nil {
TxnDB = txdb.NewTxDB(cfg.NodeType, kvdb)
TxnDB, err = txdb.NewTxDB(cfg.NodeType, kvdb, &cfg.KVDB)
logrus.Fatal("init kvdb error: ", err)
ReyisaRuby marked this conversation as resolved.
Show resolved Hide resolved
}
if Chain == nil {
Chain = blockchain.NewBlockChain(cfg.NodeType, &cfg.BlockChain, TxnDB)
Expand Down
269 changes: 214 additions & 55 deletions core/txdb/txdb.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package txdb

import (
"sync"

"github.com/sirupsen/logrus"

. "github.com/yu-org/yu/common"
"github.com/yu-org/yu/config"
. "github.com/yu-org/yu/core/types"
"github.com/yu-org/yu/infra/storage/kv"
"github.com/yu-org/yu/infra/storage/sql"
"github.com/yu-org/yu/metrics"
)

const (
Expand All @@ -13,24 +19,38 @@ const (
)

type TxDB struct {
nodeType int
txnKV kv.KV
receiptKV kv.KV
nodeType int

txnKV *txnkvdb
receiptKV *receipttxnkvdb

enableUseSql bool
db sql.SqlDB
}

func NewTxDB(nodeTyp int, kvdb kv.Kvdb) ItxDB {
return &TxDB{
nodeType: nodeTyp,
txnKV: kvdb.New(Txns),
receiptKV: kvdb.New(Results),
}
type txnkvdb struct {
sync.RWMutex
txnKV kv.KV
}

func (bb *TxDB) GetTxn(txnHash Hash) (*SignedTxn, error) {
if bb.nodeType == LightNode {
return nil, nil
const (
txnType = "txn"
receiptType = "receipt"
successStatus = "success"
errStatus = "err"
)

func getStatusValue(err error) string {
if err == nil {
return successStatus
}
byt, err := bb.txnKV.Get(txnHash.Bytes())
return errStatus
}

func (t *txnkvdb) GetTxn(txnHash Hash) (txn *SignedTxn, err error) {
t.RLock()
defer t.RUnlock()
byt, err := t.txnKV.Get(txnHash.Bytes())
if err != nil {
return nil, err
}
Expand All @@ -40,24 +60,95 @@ func (bb *TxDB) GetTxn(txnHash Hash) (*SignedTxn, error) {
return DecodeSignedTxn(byt)
}

func (bb *TxDB) GetTxns(txnHashes []Hash) ([]*SignedTxn, error) {
if bb.nodeType == LightNode {
return nil, nil
func (t *txnkvdb) ExistTxn(txnHash Hash) bool {
t.RLock()
defer t.RUnlock()
return t.txnKV.Exist(txnHash.Bytes())
}

func (t *txnkvdb) SetTxns(txns []*SignedTxn) (err error) {
t.Lock()
defer t.Unlock()
kvtx, err := t.txnKV.NewKvTxn()
if err != nil {
return err
}
txns := make([]*SignedTxn, 0)
for _, txnHash := range txnHashes {
byt, err := bb.txnKV.Get(txnHash.Bytes())
for _, txn := range txns {
txbyt, err := txn.Encode()
if err != nil {
logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
err = kvtx.Set(txn.TxnHash.Bytes(), txbyt)
if err != nil {
return err
}
}
return kvtx.Commit()
}

type TxnDBSchema struct {
Type string `gorm:"type:varchar(10)"`
Key string `gorm:"primaryKey;type:text"`
Value string `gorm:"type:text"`
}

func (TxnDBSchema) TableName() string {
return "txndb"
}

func NewTxDB(nodeTyp int, kvdb kv.Kvdb, kvdbConf *config.KVconf) (ItxDB, error) {
txdb := &TxDB{
nodeType: nodeTyp,
txnKV: &txnkvdb{txnKV: kvdb.New(Txns)},
receiptKV: &receipttxnkvdb{receiptKV: kvdb.New(Results)},
}
if kvdbConf != nil && kvdbConf.UseSQlDbConf {
db, err := sql.NewSqlDB(&kvdbConf.SQLDbConf)
if err != nil {
return nil, err
}
txdb.db = db
txdb.enableUseSql = true
if err := txdb.db.AutoMigrate(&TxnDBSchema{}); err != nil {
return nil, err
}
if byt == nil {
continue
}
return txdb, nil
}

func (bb *TxDB) GetTxn(txnHash Hash) (stxn *SignedTxn, err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(txnType, "getTxn", getStatusValue(err)).Inc()
}()
if bb.nodeType == LightNode {
return nil, nil
}
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "txn", string(txnHash.Bytes())).Find(&records).Error
// find result in sql database
if err == nil && len(records) > 0 {
return DecodeSignedTxn([]byte(records[0].Value))
}
signedTxn, err := DecodeSignedTxn(byt)
}
return bb.txnKV.GetTxn(txnHash)
}

func (bb *TxDB) GetTxns(txnHashes []Hash) (stxns []*SignedTxn, err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(txnType, "getTxn", getStatusValue(err)).Inc()
}()
if bb.nodeType == LightNode {
return nil, nil
}
txns := make([]*SignedTxn, 0)
for _, txnHash := range txnHashes {
result, err := bb.GetTxn(txnHash)
if err != nil {
return nil, err
}
txns = append(txns, signedTxn)
txns = append(txns, result)
}
return txns, nil
}
Expand All @@ -66,61 +157,99 @@ func (bb *TxDB) ExistTxn(txnHash Hash) bool {
if bb.nodeType == LightNode {
return false
}
return bb.txnKV.Exist(txnHash.Bytes())
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "txn", string(txnHash.Bytes())).Find(&records).Error
if err == nil && len(records) > 0 {
return true
}
}
return bb.txnKV.ExistTxn(txnHash)
}

func (bb *TxDB) SetTxns(txns []*SignedTxn) error {
func (bb *TxDB) SetTxns(txns []*SignedTxn) (err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(txnType, "setTxns", getStatusValue(err)).Inc()
}()
if bb.nodeType == LightNode {
return nil
}
kvtx, err := bb.txnKV.NewKvTxn()
if err != nil {
return err
}
for _, txn := range txns {
txbyt, err := txn.Encode()
if err != nil {
logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
err = kvtx.Set(txn.TxnHash.Bytes(), txbyt)
if err != nil {
return err
if bb.enableUseSql {
for _, txn := range txns {
txbyt, err := txn.Encode()
if err != nil {
logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
if err := bb.db.Db().Exec("insert into txndb (type,key,value) values (?,?,?)", "txn", string(txn.TxnHash.Bytes()), string(txbyt)).Error; err != nil {
logrus.Errorf("Insert TxDB.SetTxns tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
}
return nil
}
return kvtx.Commit()
return bb.txnKV.SetTxns(txns)
}

func (bb *TxDB) SetReceipts(receipts map[Hash]*Receipt) error {
kvtx, err := bb.receiptKV.NewKvTxn()
if err != nil {
return err
func (bb *TxDB) SetReceipts(receipts map[Hash]*Receipt) (err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(receiptType, "setReceipts", getStatusValue(err)).Inc()
}()
if bb.enableUseSql {
for txHash, receipt := range receipts {
if err := bb.SetReceipt(txHash, receipt); err != nil {
return err
}
}
return nil
}
return bb.receiptKV.SetReceipts(receipts)
}

for txHash, receipt := range receipts {
func (bb *TxDB) SetReceipt(txHash Hash, receipt *Receipt) (err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(receiptType, "setReceipt", getStatusValue(err)).Inc()
}()
if bb.enableUseSql {
byt, err := receipt.Encode()
if err != nil {
return err
}
err = kvtx.Set(txHash.Bytes(), byt)
if err != nil {
if err := bb.db.Db().Exec("insert into txndb (type,key,value) values (?,?,?)", "receipt", string(txHash.Bytes()), string(byt)).Error; err != nil {
ReyisaRuby marked this conversation as resolved.
Show resolved Hide resolved
return err
}
return nil
}

return kvtx.Commit()
return bb.receiptKV.SetReceipt(txHash, receipt)
}

func (bb *TxDB) SetReceipt(txHash Hash, receipt *Receipt) error {
byt, err := receipt.Encode()
if err != nil {
return err
func (bb *TxDB) GetReceipt(txHash Hash) (rec *Receipt, err error) {
defer func() {
metrics.TxnDBCounter.WithLabelValues(receiptType, "getReceipt", getStatusValue(err)).Inc()
}()
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "receipt", string(txHash.Bytes())).Find(&records).Error
ReyisaRuby marked this conversation as resolved.
Show resolved Hide resolved
if err == nil && len(records) > 0 {
receipt := new(Receipt)
err = receipt.Decode([]byte(records[0].Value))
if err == nil {
return receipt, nil
}
}
}
return bb.receiptKV.Set(txHash.Bytes(), byt)
return bb.receiptKV.GetReceipt(txHash)
}

func (bb *TxDB) GetReceipt(txHash Hash) (*Receipt, error) {
byt, err := bb.receiptKV.Get(txHash.Bytes())
type receipttxnkvdb struct {
sync.RWMutex
receiptKV kv.KV
}

func (r *receipttxnkvdb) GetReceipt(txHash Hash) (*Receipt, error) {
r.RLock()
defer r.RUnlock()
byt, err := r.receiptKV.Get(txHash.Bytes())
if err != nil {
logrus.Errorf("TxDB.GetReceipt(%s), failed: %s, error: %v", txHash.String(), string(byt), err)
return nil, err
Expand All @@ -135,3 +264,33 @@ func (bb *TxDB) GetReceipt(txHash Hash) (*Receipt, error) {
}
return receipt, err
}

func (r *receipttxnkvdb) SetReceipt(txHash Hash, receipt *Receipt) error {
r.Lock()
defer r.Unlock()
byt, err := receipt.Encode()
if err != nil {
return err
}
return r.receiptKV.Set(txHash.Bytes(), byt)
}

func (r *receipttxnkvdb) SetReceipts(receipts map[Hash]*Receipt) error {
r.Lock()
defer r.Unlock()
kvtx, err := r.receiptKV.NewKvTxn()
if err != nil {
return err
}
for txHash, receipt := range receipts {
byt, err := receipt.Encode()
if err != nil {
return err
}
err = kvtx.Set(txHash.Bytes(), byt)
if err != nil {
return err
}
}
return kvtx.Commit()
}
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,5 @@ func init() {
prometheus.MustRegister(TxpoolSizeGauge)
// prometheus.MustRegister(AppendBlockDuration, StartBlockDuration, EndBlockDuration, FinalizeBlockDuration)
prometheus.MustRegister(StateCommitDuration)
initTxnDBMetrics()
}
Loading
Loading