Skip to content

Commit

Permalink
Makerbook storage and save snapshot to file (#160)
Browse files Browse the repository at this point in the history
* Store makerbook and snapshot to file

* Update default paths

* Save snapshots only to the file

* minor change

* Fix log

* Revert "Save snapshots only to the file"

This reverts commit 77e92ea.

* Save snapshot only to file

* Fix log again

* Load snapshot from file first

* Return error when file path is not set

* Review fixes

* Review changes

* tiny changes for more idiomatic code

* use channels to queue messages to write to file

* stop closing the channel within consumer logic when shutdown signal comes

* fix ineffectual assignment lint error

* fix order struct

* Add missing Id set

---------

Co-authored-by: wallydrag <debashish.ghatak@gmail.com>
  • Loading branch information
lumos42 and debaghtk authored Feb 28, 2024
1 parent 2b57fd9 commit 2aa8374
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 35 deletions.
4 changes: 3 additions & 1 deletion chain.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@
"is-validator": true,
"trading-api-enabled": true,
"testing-api-enabled": true,
"load-from-snapshot-enabled": true
"load-from-snapshot-enabled": true,
"snapshot-file-path": "/tmp/snapshot",
"makerbook-database-path": "/tmp/makerbook"
}
4 changes: 3 additions & 1 deletion network-configs/aylin/chain.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@
"priority-regossip-addresses": ["0x06CCAD927e6B1d36E219Cb582Af3185D0705f78F"],
"validator-private-key-file": "/home/ubuntu/validator.pk",
"feeRecipient": "<insert address corresponding to private key above>",
"is-validator": true
"is-validator": true,
"snapshot-file-path": "/tmp/snapshot",
"makerbook-database-path": "/tmp/makerbook"
}
4 changes: 3 additions & 1 deletion network-configs/aylin/chain_api_node.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@
"coreth-admin-api-enabled": true,
"eth-apis": ["public-eth","public-eth-filter","net","web3","internal-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"],
"trading-api-enabled": true,
"testing-api-enabled": true
"testing-api-enabled": true,
"snapshot-file-path": "/tmp/snapshot",
"makerbook-database-path": "/tmp/makerbook"
}
4 changes: 3 additions & 1 deletion network-configs/aylin/chain_archival_node.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@
"coreth-admin-api-enabled": true,
"eth-apis": ["public-eth","public-eth-filter","net","web3","internal-public-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"],
"trading-api-enabled": true,
"testing-api-enabled": true
"testing-api-enabled": true,
"snapshot-file-path": "/tmp/snapshot",
"makerbook-database-path": "/tmp/makerbook"
}
4 changes: 3 additions & 1 deletion network-configs/hubblenet/chain_api_node.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@
"admin-api-enabled": true,
"eth-apis": ["public-eth","public-eth-filter","net","web3","internal-public-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"],
"trading-api-enabled": true,
"testing-api-enabled": true
"testing-api-enabled": true,
"snapshot-file-path": "/tmp/snapshot",
"makerbook-database-path": "/tmp/makerbook"
}
4 changes: 3 additions & 1 deletion network-configs/hubblenet/chain_archival_node.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@
"admin-api-enabled": true,
"eth-apis": ["public-eth","public-eth-filter","net","web3","internal-public-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"],
"trading-api-enabled": true,
"testing-api-enabled": true
"testing-api-enabled": true,
"snapshot-file-path": "/tmp/snapshot",
"makerbook-database-path": "/tmp/makerbook"
}
3 changes: 2 additions & 1 deletion network-configs/hubblenet/chain_validator_1.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
"continuous-profiler-frequency": "10m",
"validator-private-key-file": "/var/avalanche/validator.pk",
"feeRecipient": "0xa5e31FbE901362Cc93b6fdab99DB9741c673a942",
"is-validator": true
"is-validator": true,
"snapshot-file-path": "/tmp/snapshot"
}
10 changes: 10 additions & 0 deletions plugin/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ const (
defaultIsValidator = false
defaultTradingAPIEnabled = false
defaultLoadFromSnapshotEnabled = true
defaultSnapshotFilePath = "/tmp/snapshot"
defaultMakerbookDatabasePath = "/tmp/makerbook"
)

var (
Expand Down Expand Up @@ -242,6 +244,12 @@ type Config struct {

// LoadFromSnapshotEnabled = true if the node should load the memory db from a snapshot
LoadFromSnapshotEnabled bool `json:"load-from-snapshot-enabled"`

// SnapshotFilePath is the path to the file which saves the latest snapshot bytes
SnapshotFilePath string `json:"snapshot-file-path"`

// MakerbookDatabasePath is the path to the file which saves the makerbook orders
MakerbookDatabasePath string `json:"makerbook-database-path"`
}

// EthAPIs returns an array of strings representing the Eth APIs that should be enabled
Expand Down Expand Up @@ -306,6 +314,8 @@ func (c *Config) SetDefaults() {
c.IsValidator = defaultIsValidator
c.TradingAPIEnabled = defaultTradingAPIEnabled
c.LoadFromSnapshotEnabled = defaultLoadFromSnapshotEnabled
c.SnapshotFilePath = defaultSnapshotFilePath
c.MakerbookDatabasePath = defaultMakerbookDatabasePath
}

func (d *Duration) UnmarshalJSON(data []byte) (err error) {
Expand Down
69 changes: 59 additions & 10 deletions plugin/evm/limit_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/gob"
"fmt"
"math/big"
"os"
"runtime"
"runtime/debug"
"sync"
Expand Down Expand Up @@ -58,6 +59,7 @@ type limitOrderProcesser struct {
tradingAPIEnabled bool
loadFromSnapshotEnabled bool
snapshotSavedBlockNumber uint64
snapshotFilePath string
tradingAPI *orderbook.TradingAPI
}

Expand Down Expand Up @@ -109,6 +111,7 @@ func NewLimitOrderProcesser(ctx *snow.Context, txPool *txpool.TxPool, shutdownCh
isValidator: config.IsValidator,
tradingAPIEnabled: config.TradingAPIEnabled,
loadFromSnapshotEnabled: config.LoadFromSnapshotEnabled,
snapshotFilePath: config.SnapshotFilePath,
}
}

Expand All @@ -128,7 +131,6 @@ func (lop *limitOrderProcesser) ListenAndProcessTransactions(blockBuilder *block
} else {
if acceptedBlockNumber > 0 {
fromBlock = big.NewInt(int64(acceptedBlockNumber) + 1)
log.Info("ListenAndProcessTransactions - memory DB snapshot loaded", "acceptedBlockNumber", acceptedBlockNumber)
} else {
// not an error, but unlikely after the blockchain is running for some time
log.Warn("ListenAndProcessTransactions - no snapshot found")
Expand Down Expand Up @@ -193,7 +195,7 @@ func (lop *limitOrderProcesser) GetOrderBookAPI() *orderbook.OrderBookAPI {

func (lop *limitOrderProcesser) GetTradingAPI() *orderbook.TradingAPI {
if lop.tradingAPI == nil {
lop.tradingAPI = orderbook.NewTradingAPI(lop.memoryDb, lop.backend, lop.configService)
lop.tradingAPI = orderbook.NewTradingAPI(lop.memoryDb, lop.backend, lop.configService, lop.shutdownChan, lop.shutdownWg)
}
return lop.tradingAPI
}
Expand Down Expand Up @@ -268,6 +270,7 @@ func (lop *limitOrderProcesser) listenAndStoreLimitOrderTransactions() {
lop.memoryDb.Accept(snapshotBlockNumber, snapshotBlock.Timestamp())
err := lop.saveMemoryDBSnapshot(big.NewInt(int64(snapshotBlockNumber)))
if err != nil {
orderbook.SnapshotWriteFailuresCounter.Inc(1)
log.Error("Error in saving memory DB snapshot", "err", err, "snapshotBlockNumber", snapshotBlockNumber, "current blockNumber", blockNumber, "blockNumberFloor", blockNumberFloor)
}
}
Expand Down Expand Up @@ -349,36 +352,76 @@ func (lop *limitOrderProcesser) runMatchingTimer() {
}

func (lop *limitOrderProcesser) loadMemoryDBSnapshot() (acceptedBlockNumber uint64, err error) {
acceptedBlockNumber, err = lop.loadMemoryDBSnapshotFromFile()
if err != nil || acceptedBlockNumber == 0 {
acceptedBlockNumber, err = lop.loadMemoryDBSnapshotFromHubbleDB()
}
return acceptedBlockNumber, err
}

func (lop *limitOrderProcesser) loadMemoryDBSnapshotFromHubbleDB() (uint64, error) {
snapshotFound, err := lop.hubbleDB.Has([]byte(memoryDBSnapshotKey))
if err != nil {
return acceptedBlockNumber, fmt.Errorf("Error in checking snapshot in hubbleDB: err=%v", err)
return 0, fmt.Errorf("Error in checking snapshot in hubbleDB: err=%v", err)
}

if !snapshotFound {
return acceptedBlockNumber, nil
return 0, nil
}

memorySnapshotBytes, err := lop.hubbleDB.Get([]byte(memoryDBSnapshotKey))
if err != nil {
return acceptedBlockNumber, fmt.Errorf("Error in fetching snapshot from hubbleDB; err=%v", err)
return 0, fmt.Errorf("Error in fetching snapshot from hubbleDB; err=%v", err)
}

buf := bytes.NewBuffer(memorySnapshotBytes)
var snapshot orderbook.Snapshot
err = gob.NewDecoder(buf).Decode(&snapshot)
if err != nil {
return 0, fmt.Errorf("Error in snapshot parsing from hubbleDB; err=%v", err)
}

if snapshot.AcceptedBlockNumber != nil && snapshot.AcceptedBlockNumber.Uint64() > 0 {
err = lop.memoryDb.LoadFromSnapshot(snapshot)
if err != nil {
return 0, fmt.Errorf("Error in loading snapshot from hubbleDB: err=%v", err)
} else {
log.Info("memory DB snapshot loaded from hubbleDB", "acceptedBlockNumber", snapshot.AcceptedBlockNumber)
return snapshot.AcceptedBlockNumber.Uint64(), nil
}
} else {
return 0, nil
}
}

func (lop *limitOrderProcesser) loadMemoryDBSnapshotFromFile() (uint64, error) {
if lop.snapshotFilePath == "" {
return 0, fmt.Errorf("snapshot file path not set")
}

memorySnapshotBytes, err := os.ReadFile(lop.snapshotFilePath)
if err != nil {
return 0, fmt.Errorf("Error in reading snapshot file: err=%v", err)
}

buf := bytes.NewBuffer(memorySnapshotBytes)
var snapshot orderbook.Snapshot
err = gob.NewDecoder(buf).Decode(&snapshot)
if err != nil {
return acceptedBlockNumber, fmt.Errorf("Error in snapshot parsing; err=%v", err)
return 0, fmt.Errorf("Error in snapshot parsing from file; err=%v", err)
}

if snapshot.AcceptedBlockNumber != nil && snapshot.AcceptedBlockNumber.Uint64() > 0 {
err = lop.memoryDb.LoadFromSnapshot(snapshot)
if err != nil {
return acceptedBlockNumber, fmt.Errorf("Error in loading from snapshot: err=%v", err)
return 0, fmt.Errorf("Error in loading snapshot from file: err=%v", err)
} else {
log.Info("memory DB snapshot loaded from file", "acceptedBlockNumber", snapshot.AcceptedBlockNumber)
}

return snapshot.AcceptedBlockNumber.Uint64(), nil
} else {
return acceptedBlockNumber, nil
return 0, nil
}
}

Expand All @@ -387,6 +430,10 @@ func (lop *limitOrderProcesser) saveMemoryDBSnapshot(acceptedBlockNumber *big.In
start := time.Now()
currentHeadBlock := lop.blockChain.CurrentBlock()

if lop.snapshotFilePath == "" {
return fmt.Errorf("snapshot file path not set")
}

memoryDBCopy, err := lop.memoryDb.GetOrderBookDataCopy()
if err != nil {
return fmt.Errorf("Error in getting memory DB copy: err=%v", err)
Expand Down Expand Up @@ -428,9 +475,11 @@ func (lop *limitOrderProcesser) saveMemoryDBSnapshot(acceptedBlockNumber *big.In
return fmt.Errorf("error in gob encoding: err=%v", err)
}

err = lop.hubbleDB.Put([]byte(memoryDBSnapshotKey), buf.Bytes())
snapshotBytes := buf.Bytes()
// write to snapshot file
err = os.WriteFile(lop.snapshotFilePath, snapshotBytes, 0644)
if err != nil {
return fmt.Errorf("Error in saving to DB: err=%v", err)
return fmt.Errorf("Error in writing to snapshot file: err=%v", err)
}

lop.snapshotSavedBlockNumber = acceptedBlockNumber.Uint64()
Expand Down
11 changes: 6 additions & 5 deletions plugin/evm/orderbook/errors.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package orderbook

const (
HandleChainAcceptedEventPanicMessage = "panic while processing chainAcceptedEvent"
HandleChainAcceptedLogsPanicMessage = "panic while processing chainAcceptedLogs"
HandleHubbleFeedLogsPanicMessage = "panic while processing hubbleFeedLogs"
RunMatchingPipelinePanicMessage = "panic while running matching pipeline"
RunSanitaryPipelinePanicMessage = "panic while running sanitary pipeline"
HandleChainAcceptedEventPanicMessage = "panic while processing chainAcceptedEvent"
HandleChainAcceptedLogsPanicMessage = "panic while processing chainAcceptedLogs"
HandleHubbleFeedLogsPanicMessage = "panic while processing hubbleFeedLogs"
RunMatchingPipelinePanicMessage = "panic while running matching pipeline"
RunSanitaryPipelinePanicMessage = "panic while running sanitary pipeline"
MakerBookFileWriteChannelPanicMessage = "panic while sending to makerbook file write channel"
)
7 changes: 7 additions & 0 deletions plugin/evm/orderbook/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
HandleMatchingPipelineTimerPanicsCounter = metrics.NewRegisteredCounter("handle_matching_pipeline_timer_panics", nil)
RPCPanicsCounter = metrics.NewRegisteredCounter("rpc_panic", nil)
AwaitSignedOrdersGossipPanicsCounter = metrics.NewRegisteredCounter("await_signed_orders_gossip_panics", nil)
MakerbookFileWriteChannelPanicsCounter = metrics.NewRegisteredCounter("makerbook_file_write_channel_panics", nil)

BuildBlockFailedWithLowBlockGasCounter = metrics.NewRegisteredCounter("build_block_failed_low_block_gas", nil)

Expand All @@ -39,4 +40,10 @@ var (
// unquenched liquidations
unquenchedLiquidationsCounter = metrics.NewRegisteredCounter("unquenched_liquidations", nil)
placeSignedOrderCounter = metrics.NewRegisteredCounter("place_signed_order", nil)

// makerbook write failures
makerBookWriteFailuresCounter = metrics.NewRegisteredCounter("makerbook_write_failures", nil)

// snapshot write failures
SnapshotWriteFailuresCounter = metrics.NewRegisteredCounter("snapshot_write_failures", nil)
)
Loading

0 comments on commit 2aa8374

Please sign in to comment.