Skip to content

Commit

Permalink
support sql write/double read
Browse files Browse the repository at this point in the history
Signed-off-by: ReyisaRuby <yisa@reddio.com>
  • Loading branch information
ReyisaRuby committed Jan 7, 2025
1 parent 0246c0a commit d5047f2
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 15 deletions.
3 changes: 3 additions & 0 deletions config/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ type KVconf struct {
Path string `toml:"path"`
// distributed kvdb
Hosts []string `toml:"hosts"`

UseSQlDbConf bool `toml:"use_sql_db"`
SQLDbConf SqlDbConf `toml:"sql_db"`
}

type SqlDbConf struct {
Expand Down
6 changes: 4 additions & 2 deletions core/startup/startup.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package startup

import (
"os"
"path"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"

"github.com/yu-org/yu/apps/synchronizer"
"github.com/yu-org/yu/config"
"github.com/yu-org/yu/core/blockchain"
Expand All @@ -17,8 +21,6 @@ import (
"github.com/yu-org/yu/infra/p2p"
"github.com/yu-org/yu/infra/storage/kv"
"github.com/yu-org/yu/utils/codec"
"os"
"path"
)

var (
Expand Down
105 changes: 92 additions & 13 deletions core/txdb/txdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package txdb

import (
"github.com/sirupsen/logrus"

. "github.com/yu-org/yu/common"
"github.com/yu-org/yu/config"
. "github.com/yu-org/yu/core/types"
"github.com/yu-org/yu/infra/storage/kv"
"github.com/yu-org/yu/infra/storage/sql"
)

const (
Expand All @@ -13,23 +16,57 @@ const (
)

type TxDB struct {
nodeType int
nodeType int

txnKV kv.KV
receiptKV kv.KV

enableUseSql bool
db sql.SqlDB
}

type TxnDBSchema struct {
Type string `gorm:"type:varchar(10)"`
Key string `gorm:"primaryKey;type:text"`
Value string `gorm:"type:text"`
}

func NewTxDB(nodeTyp int, kvdb kv.Kvdb) ItxDB {
return &TxDB{
func (TxnDBSchema) TableName() string {
return "txndb"
}

func NewTxDB(nodeTyp int, kvdb kv.Kvdb, kvdbConf *config.KVconf) (ItxDB, error) {
txdb := &TxDB{
nodeType: nodeTyp,
txnKV: kvdb.New(Txns),
receiptKV: kvdb.New(Results),
}
if kvdbConf.UseSQlDbConf {
db, err := sql.NewSqlDB(&kvdbConf.SQLDbConf)
if err != nil {
return nil, err
}
txdb.db = db
txdb.enableUseSql = true
}
if err := txdb.db.AutoMigrate(&TxnDBSchema{}); err != nil {
return nil, err
}
return txdb, nil
}

func (bb *TxDB) GetTxn(txnHash Hash) (*SignedTxn, error) {
if bb.nodeType == LightNode {
return nil, nil
}
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "txn", string(txnHash.Bytes())).Find(&records).Error
// find result in sql database
if err == nil && len(records) > 0 {
return DecodeSignedTxn([]byte(records[0].Value))
}
}
byt, err := bb.txnKV.Get(txnHash.Bytes())
if err != nil {
return nil, err
Expand All @@ -46,18 +83,11 @@ func (bb *TxDB) GetTxns(txnHashes []Hash) ([]*SignedTxn, error) {
}
txns := make([]*SignedTxn, 0)
for _, txnHash := range txnHashes {
byt, err := bb.txnKV.Get(txnHash.Bytes())
if err != nil {
return nil, err
}
if byt == nil {
continue
}
signedTxn, err := DecodeSignedTxn(byt)
result, err := bb.GetTxn(txnHash)
if err != nil {
return nil, err
}
txns = append(txns, signedTxn)
txns = append(txns, result)
}
return txns, nil
}
Expand All @@ -66,13 +96,34 @@ func (bb *TxDB) ExistTxn(txnHash Hash) bool {
if bb.nodeType == LightNode {
return false
}
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "txn", string(txnHash.Bytes())).Find(&records).Error
if err == nil && len(records) > 0 {
return true
}
}
return bb.txnKV.Exist(txnHash.Bytes())
}

func (bb *TxDB) SetTxns(txns []*SignedTxn) error {
if bb.nodeType == LightNode {
return nil
}
if bb.enableUseSql {
for _, txn := range txns {
txbyt, err := txn.Encode()
if err != nil {
logrus.Errorf("TxDB.SetTxns set tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
if err := bb.db.Db().Exec("insert into txndb (type,key,value) values (?,?,?)", "txn", string(txn.TxnHash.Bytes()), string(txbyt)).Error; err != nil {
logrus.Errorf("Insert TxDB.SetTxns tx(%s) failed: %v", txn.TxnHash.String(), err)
return err
}
}
return nil
}
kvtx, err := bb.txnKV.NewKvTxn()
if err != nil {
return err
Expand All @@ -92,11 +143,18 @@ func (bb *TxDB) SetTxns(txns []*SignedTxn) error {
}

func (bb *TxDB) SetReceipts(receipts map[Hash]*Receipt) error {
if bb.enableUseSql {
for txHash, receipt := range receipts {
if err := bb.SetReceipt(txHash, receipt); err != nil {
return err
}
}
return nil
}
kvtx, err := bb.receiptKV.NewKvTxn()
if err != nil {
return err
}

for txHash, receipt := range receipts {
byt, err := receipt.Encode()
if err != nil {
Expand All @@ -112,6 +170,16 @@ func (bb *TxDB) SetReceipts(receipts map[Hash]*Receipt) error {
}

func (bb *TxDB) SetReceipt(txHash Hash, receipt *Receipt) error {
if bb.enableUseSql {
byt, err := receipt.Encode()
if err != nil {
return err
}
if err := bb.db.Db().Exec("insert into txndb (type,key,value) values (?,?,?)", "receipt", string(txHash.Bytes()), string(byt)).Error; err != nil {
return err
}
return nil
}
byt, err := receipt.Encode()
if err != nil {
return err
Expand All @@ -120,6 +188,17 @@ func (bb *TxDB) SetReceipt(txHash Hash, receipt *Receipt) error {
}

func (bb *TxDB) GetReceipt(txHash Hash) (*Receipt, error) {
if bb.enableUseSql {
var records []TxnDBSchema
err := bb.db.Db().Raw("select value from txndb where type = ? and key = ?", "receipt", string(txHash.Bytes())).Find(&records).Error
if err == nil && len(records) > 0 {
receipt := new(Receipt)
err = receipt.Decode([]byte(records[0].Value))
if err == nil {
return receipt, nil
}
}
}
byt, err := bb.receiptKV.Get(txHash.Bytes())
if err != nil {
logrus.Errorf("TxDB.GetReceipt(%s), failed: %s, error: %v", txHash.String(), string(byt), err)
Expand Down

0 comments on commit d5047f2

Please sign in to comment.