From 2aa83749a8369df10c1fb13003e76560461a0427 Mon Sep 17 00:00:00 2001 From: Shubham Date: Wed, 28 Feb 2024 11:47:22 +0530 Subject: [PATCH] Makerbook storage and save snapshot to file (#160) * Store makerbook and snapshot to file * Update default paths * Save snapshots only to the file * minor change * Fix log * Revert "Save snapshots only to the file" This reverts commit 77e92eafb03d7d0a76a143822f4ea3622284d37e. * Save snapshot only to file * Fix log again * Load snapshot from file first * Return error when file path is not set * Review fixes * Review changes * tiny changes for more idiomatic code * use channels to queue messages to write to file * stop closing the channel within consumer logic when shutdown signal comes * fix ineffectual assignment lint error * fix order struct * Add missing Id set --------- Co-authored-by: wallydrag --- chain.json | 4 +- network-configs/aylin/chain.json | 4 +- network-configs/aylin/chain_api_node.json | 4 +- .../aylin/chain_archival_node.json | 4 +- network-configs/hubblenet/chain_api_node.json | 4 +- .../hubblenet/chain_archival_node.json | 4 +- .../hubblenet/chain_validator_1.json | 3 +- plugin/evm/config.go | 10 ++ plugin/evm/limit_order.go | 69 ++++++++++++-- plugin/evm/orderbook/errors.go | 11 ++- plugin/evm/orderbook/metrics.go | 7 ++ plugin/evm/orderbook/trading_apis.go | 91 ++++++++++++++++--- plugin/evm/vm.go | 1 + utils/file.go | 20 ++++ 14 files changed, 201 insertions(+), 35 deletions(-) create mode 100644 utils/file.go diff --git a/chain.json b/chain.json index fc5c56a658..20a6c96b20 100644 --- a/chain.json +++ b/chain.json @@ -10,5 +10,7 @@ "is-validator": true, "trading-api-enabled": true, "testing-api-enabled": true, - "load-from-snapshot-enabled": true + "load-from-snapshot-enabled": true, + "snapshot-file-path": "/tmp/snapshot", + "makerbook-database-path": "/tmp/makerbook" } diff --git a/network-configs/aylin/chain.json b/network-configs/aylin/chain.json index 28a662f770..a01a49286f 100644 --- a/network-configs/aylin/chain.json +++ b/network-configs/aylin/chain.json @@ -8,5 +8,7 @@ "priority-regossip-addresses": ["0x06CCAD927e6B1d36E219Cb582Af3185D0705f78F"], "validator-private-key-file": "/home/ubuntu/validator.pk", "feeRecipient": "", - "is-validator": true + "is-validator": true, + "snapshot-file-path": "/tmp/snapshot", + "makerbook-database-path": "/tmp/makerbook" } diff --git a/network-configs/aylin/chain_api_node.json b/network-configs/aylin/chain_api_node.json index 6a61c830b9..708c0556d3 100644 --- a/network-configs/aylin/chain_api_node.json +++ b/network-configs/aylin/chain_api_node.json @@ -9,5 +9,7 @@ "coreth-admin-api-enabled": true, "eth-apis": ["public-eth","public-eth-filter","net","web3","internal-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"], "trading-api-enabled": true, - "testing-api-enabled": true + "testing-api-enabled": true, + "snapshot-file-path": "/tmp/snapshot", + "makerbook-database-path": "/tmp/makerbook" } diff --git a/network-configs/aylin/chain_archival_node.json b/network-configs/aylin/chain_archival_node.json index cf2c361794..2d43af04de 100644 --- a/network-configs/aylin/chain_archival_node.json +++ b/network-configs/aylin/chain_archival_node.json @@ -10,5 +10,7 @@ "coreth-admin-api-enabled": true, "eth-apis": ["public-eth","public-eth-filter","net","web3","internal-public-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"], "trading-api-enabled": true, - "testing-api-enabled": true + "testing-api-enabled": true, + "snapshot-file-path": "/tmp/snapshot", + "makerbook-database-path": "/tmp/makerbook" } diff --git a/network-configs/hubblenet/chain_api_node.json b/network-configs/hubblenet/chain_api_node.json index 2deb5bf5c5..18ec743005 100644 --- a/network-configs/hubblenet/chain_api_node.json +++ b/network-configs/hubblenet/chain_api_node.json @@ -12,5 +12,7 @@ "admin-api-enabled": true, "eth-apis": ["public-eth","public-eth-filter","net","web3","internal-public-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"], "trading-api-enabled": true, - "testing-api-enabled": true + "testing-api-enabled": true, + "snapshot-file-path": "/tmp/snapshot", + "makerbook-database-path": "/tmp/makerbook" } diff --git a/network-configs/hubblenet/chain_archival_node.json b/network-configs/hubblenet/chain_archival_node.json index dfcf759fb4..2471a78dc8 100644 --- a/network-configs/hubblenet/chain_archival_node.json +++ b/network-configs/hubblenet/chain_archival_node.json @@ -13,5 +13,7 @@ "admin-api-enabled": true, "eth-apis": ["public-eth","public-eth-filter","net","web3","internal-public-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"], "trading-api-enabled": true, - "testing-api-enabled": true + "testing-api-enabled": true, + "snapshot-file-path": "/tmp/snapshot", + "makerbook-database-path": "/tmp/makerbook" } diff --git a/network-configs/hubblenet/chain_validator_1.json b/network-configs/hubblenet/chain_validator_1.json index 6f8d868eaa..b5694f7d3a 100644 --- a/network-configs/hubblenet/chain_validator_1.json +++ b/network-configs/hubblenet/chain_validator_1.json @@ -11,5 +11,6 @@ "continuous-profiler-frequency": "10m", "validator-private-key-file": "/var/avalanche/validator.pk", "feeRecipient": "0xa5e31FbE901362Cc93b6fdab99DB9741c673a942", - "is-validator": true + "is-validator": true, + "snapshot-file-path": "/tmp/snapshot" } diff --git a/plugin/evm/config.go b/plugin/evm/config.go index a165fe89f5..5cf2027eab 100644 --- a/plugin/evm/config.go +++ b/plugin/evm/config.go @@ -63,6 +63,8 @@ const ( defaultIsValidator = false defaultTradingAPIEnabled = false defaultLoadFromSnapshotEnabled = true + defaultSnapshotFilePath = "/tmp/snapshot" + defaultMakerbookDatabasePath = "/tmp/makerbook" ) var ( @@ -242,6 +244,12 @@ type Config struct { // LoadFromSnapshotEnabled = true if the node should load the memory db from a snapshot LoadFromSnapshotEnabled bool `json:"load-from-snapshot-enabled"` + + // SnapshotFilePath is the path to the file which saves the latest snapshot bytes + SnapshotFilePath string `json:"snapshot-file-path"` + + // MakerbookDatabasePath is the path to the file which saves the makerbook orders + MakerbookDatabasePath string `json:"makerbook-database-path"` } // EthAPIs returns an array of strings representing the Eth APIs that should be enabled @@ -306,6 +314,8 @@ func (c *Config) SetDefaults() { c.IsValidator = defaultIsValidator c.TradingAPIEnabled = defaultTradingAPIEnabled c.LoadFromSnapshotEnabled = defaultLoadFromSnapshotEnabled + c.SnapshotFilePath = defaultSnapshotFilePath + c.MakerbookDatabasePath = defaultMakerbookDatabasePath } func (d *Duration) UnmarshalJSON(data []byte) (err error) { diff --git a/plugin/evm/limit_order.go b/plugin/evm/limit_order.go index 8436cd6f10..7ddcbee89c 100644 --- a/plugin/evm/limit_order.go +++ b/plugin/evm/limit_order.go @@ -6,6 +6,7 @@ import ( "encoding/gob" "fmt" "math/big" + "os" "runtime" "runtime/debug" "sync" @@ -58,6 +59,7 @@ type limitOrderProcesser struct { tradingAPIEnabled bool loadFromSnapshotEnabled bool snapshotSavedBlockNumber uint64 + snapshotFilePath string tradingAPI *orderbook.TradingAPI } @@ -109,6 +111,7 @@ func NewLimitOrderProcesser(ctx *snow.Context, txPool *txpool.TxPool, shutdownCh isValidator: config.IsValidator, tradingAPIEnabled: config.TradingAPIEnabled, loadFromSnapshotEnabled: config.LoadFromSnapshotEnabled, + snapshotFilePath: config.SnapshotFilePath, } } @@ -128,7 +131,6 @@ func (lop *limitOrderProcesser) ListenAndProcessTransactions(blockBuilder *block } else { if acceptedBlockNumber > 0 { fromBlock = big.NewInt(int64(acceptedBlockNumber) + 1) - log.Info("ListenAndProcessTransactions - memory DB snapshot loaded", "acceptedBlockNumber", acceptedBlockNumber) } else { // not an error, but unlikely after the blockchain is running for some time log.Warn("ListenAndProcessTransactions - no snapshot found") @@ -193,7 +195,7 @@ func (lop *limitOrderProcesser) GetOrderBookAPI() *orderbook.OrderBookAPI { func (lop *limitOrderProcesser) GetTradingAPI() *orderbook.TradingAPI { if lop.tradingAPI == nil { - lop.tradingAPI = orderbook.NewTradingAPI(lop.memoryDb, lop.backend, lop.configService) + lop.tradingAPI = orderbook.NewTradingAPI(lop.memoryDb, lop.backend, lop.configService, lop.shutdownChan, lop.shutdownWg) } return lop.tradingAPI } @@ -268,6 +270,7 @@ func (lop *limitOrderProcesser) listenAndStoreLimitOrderTransactions() { lop.memoryDb.Accept(snapshotBlockNumber, snapshotBlock.Timestamp()) err := lop.saveMemoryDBSnapshot(big.NewInt(int64(snapshotBlockNumber))) if err != nil { + orderbook.SnapshotWriteFailuresCounter.Inc(1) log.Error("Error in saving memory DB snapshot", "err", err, "snapshotBlockNumber", snapshotBlockNumber, "current blockNumber", blockNumber, "blockNumberFloor", blockNumberFloor) } } @@ -349,36 +352,76 @@ func (lop *limitOrderProcesser) runMatchingTimer() { } func (lop *limitOrderProcesser) loadMemoryDBSnapshot() (acceptedBlockNumber uint64, err error) { + acceptedBlockNumber, err = lop.loadMemoryDBSnapshotFromFile() + if err != nil || acceptedBlockNumber == 0 { + acceptedBlockNumber, err = lop.loadMemoryDBSnapshotFromHubbleDB() + } + return acceptedBlockNumber, err +} + +func (lop *limitOrderProcesser) loadMemoryDBSnapshotFromHubbleDB() (uint64, error) { snapshotFound, err := lop.hubbleDB.Has([]byte(memoryDBSnapshotKey)) if err != nil { - return acceptedBlockNumber, fmt.Errorf("Error in checking snapshot in hubbleDB: err=%v", err) + return 0, fmt.Errorf("Error in checking snapshot in hubbleDB: err=%v", err) } if !snapshotFound { - return acceptedBlockNumber, nil + return 0, nil } memorySnapshotBytes, err := lop.hubbleDB.Get([]byte(memoryDBSnapshotKey)) if err != nil { - return acceptedBlockNumber, fmt.Errorf("Error in fetching snapshot from hubbleDB; err=%v", err) + return 0, fmt.Errorf("Error in fetching snapshot from hubbleDB; err=%v", err) + } + + buf := bytes.NewBuffer(memorySnapshotBytes) + var snapshot orderbook.Snapshot + err = gob.NewDecoder(buf).Decode(&snapshot) + if err != nil { + return 0, fmt.Errorf("Error in snapshot parsing from hubbleDB; err=%v", err) + } + + if snapshot.AcceptedBlockNumber != nil && snapshot.AcceptedBlockNumber.Uint64() > 0 { + err = lop.memoryDb.LoadFromSnapshot(snapshot) + if err != nil { + return 0, fmt.Errorf("Error in loading snapshot from hubbleDB: err=%v", err) + } else { + log.Info("memory DB snapshot loaded from hubbleDB", "acceptedBlockNumber", snapshot.AcceptedBlockNumber) + return snapshot.AcceptedBlockNumber.Uint64(), nil + } + } else { + return 0, nil + } +} + +func (lop *limitOrderProcesser) loadMemoryDBSnapshotFromFile() (uint64, error) { + if lop.snapshotFilePath == "" { + return 0, fmt.Errorf("snapshot file path not set") + } + + memorySnapshotBytes, err := os.ReadFile(lop.snapshotFilePath) + if err != nil { + return 0, fmt.Errorf("Error in reading snapshot file: err=%v", err) } buf := bytes.NewBuffer(memorySnapshotBytes) var snapshot orderbook.Snapshot err = gob.NewDecoder(buf).Decode(&snapshot) if err != nil { - return acceptedBlockNumber, fmt.Errorf("Error in snapshot parsing; err=%v", err) + return 0, fmt.Errorf("Error in snapshot parsing from file; err=%v", err) } if snapshot.AcceptedBlockNumber != nil && snapshot.AcceptedBlockNumber.Uint64() > 0 { err = lop.memoryDb.LoadFromSnapshot(snapshot) if err != nil { - return acceptedBlockNumber, fmt.Errorf("Error in loading from snapshot: err=%v", err) + return 0, fmt.Errorf("Error in loading snapshot from file: err=%v", err) + } else { + log.Info("memory DB snapshot loaded from file", "acceptedBlockNumber", snapshot.AcceptedBlockNumber) } return snapshot.AcceptedBlockNumber.Uint64(), nil } else { - return acceptedBlockNumber, nil + return 0, nil } } @@ -387,6 +430,10 @@ func (lop *limitOrderProcesser) saveMemoryDBSnapshot(acceptedBlockNumber *big.In start := time.Now() currentHeadBlock := lop.blockChain.CurrentBlock() + if lop.snapshotFilePath == "" { + return fmt.Errorf("snapshot file path not set") + } + memoryDBCopy, err := lop.memoryDb.GetOrderBookDataCopy() if err != nil { return fmt.Errorf("Error in getting memory DB copy: err=%v", err) @@ -428,9 +475,11 @@ func (lop *limitOrderProcesser) saveMemoryDBSnapshot(acceptedBlockNumber *big.In return fmt.Errorf("error in gob encoding: err=%v", err) } - err = lop.hubbleDB.Put([]byte(memoryDBSnapshotKey), buf.Bytes()) + snapshotBytes := buf.Bytes() + // write to snapshot file + err = os.WriteFile(lop.snapshotFilePath, snapshotBytes, 0644) if err != nil { - return fmt.Errorf("Error in saving to DB: err=%v", err) + return fmt.Errorf("Error in writing to snapshot file: err=%v", err) } lop.snapshotSavedBlockNumber = acceptedBlockNumber.Uint64() diff --git a/plugin/evm/orderbook/errors.go b/plugin/evm/orderbook/errors.go index df7f92a021..9fc16380a5 100644 --- a/plugin/evm/orderbook/errors.go +++ b/plugin/evm/orderbook/errors.go @@ -1,9 +1,10 @@ package orderbook const ( - HandleChainAcceptedEventPanicMessage = "panic while processing chainAcceptedEvent" - HandleChainAcceptedLogsPanicMessage = "panic while processing chainAcceptedLogs" - HandleHubbleFeedLogsPanicMessage = "panic while processing hubbleFeedLogs" - RunMatchingPipelinePanicMessage = "panic while running matching pipeline" - RunSanitaryPipelinePanicMessage = "panic while running sanitary pipeline" + HandleChainAcceptedEventPanicMessage = "panic while processing chainAcceptedEvent" + HandleChainAcceptedLogsPanicMessage = "panic while processing chainAcceptedLogs" + HandleHubbleFeedLogsPanicMessage = "panic while processing hubbleFeedLogs" + RunMatchingPipelinePanicMessage = "panic while running matching pipeline" + RunSanitaryPipelinePanicMessage = "panic while running sanitary pipeline" + MakerBookFileWriteChannelPanicMessage = "panic while sending to makerbook file write channel" ) diff --git a/plugin/evm/orderbook/metrics.go b/plugin/evm/orderbook/metrics.go index cc42690f72..85b6318fcf 100644 --- a/plugin/evm/orderbook/metrics.go +++ b/plugin/evm/orderbook/metrics.go @@ -27,6 +27,7 @@ var ( HandleMatchingPipelineTimerPanicsCounter = metrics.NewRegisteredCounter("handle_matching_pipeline_timer_panics", nil) RPCPanicsCounter = metrics.NewRegisteredCounter("rpc_panic", nil) AwaitSignedOrdersGossipPanicsCounter = metrics.NewRegisteredCounter("await_signed_orders_gossip_panics", nil) + MakerbookFileWriteChannelPanicsCounter = metrics.NewRegisteredCounter("makerbook_file_write_channel_panics", nil) BuildBlockFailedWithLowBlockGasCounter = metrics.NewRegisteredCounter("build_block_failed_low_block_gas", nil) @@ -39,4 +40,10 @@ var ( // unquenched liquidations unquenchedLiquidationsCounter = metrics.NewRegisteredCounter("unquenched_liquidations", nil) placeSignedOrderCounter = metrics.NewRegisteredCounter("place_signed_order", nil) + + // makerbook write failures + makerBookWriteFailuresCounter = metrics.NewRegisteredCounter("makerbook_write_failures", nil) + + // snapshot write failures + SnapshotWriteFailuresCounter = metrics.NewRegisteredCounter("snapshot_write_failures", nil) ) diff --git a/plugin/evm/orderbook/trading_apis.go b/plugin/evm/orderbook/trading_apis.go index 04ef4c5d97..b1f3149c65 100644 --- a/plugin/evm/orderbook/trading_apis.go +++ b/plugin/evm/orderbook/trading_apis.go @@ -5,10 +5,12 @@ package orderbook import ( "context" + "encoding/json" "errors" "fmt" "math/big" "strings" + "sync" "time" "github.com/ava-labs/subnet-evm/eth" @@ -17,23 +19,46 @@ import ( "github.com/ava-labs/subnet-evm/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" ) var traderFeed event.Feed var marketFeed event.Feed +var MakerbookDatabaseFile string type TradingAPI struct { - db LimitOrderDatabase - backend *eth.EthAPIBackend - configService IConfigService + db LimitOrderDatabase + backend *eth.EthAPIBackend + configService IConfigService + makerbookFileWriteChan chan Order + shutdownChan <-chan struct{} + shutdownWg *sync.WaitGroup } -func NewTradingAPI(database LimitOrderDatabase, backend *eth.EthAPIBackend, configService IConfigService) *TradingAPI { - return &TradingAPI{ - db: database, - backend: backend, - configService: configService, +func NewTradingAPI(database LimitOrderDatabase, backend *eth.EthAPIBackend, configService IConfigService, shutdownChan <-chan struct{}, shutdownWg *sync.WaitGroup) *TradingAPI { + tradingAPI := &TradingAPI{ + db: database, + backend: backend, + configService: configService, + makerbookFileWriteChan: make(chan Order, 100), + shutdownChan: shutdownChan, + shutdownWg: shutdownWg, } + + shutdownWg.Add(1) + go func() { + defer shutdownWg.Done() + for { + select { + case order := <-tradingAPI.makerbookFileWriteChan: + writeOrderToFile(order) + case <-tradingAPI.shutdownChan: + return + } + } + }() + + return tradingAPI } type TradingOrderBookDepthResponse struct { @@ -97,13 +122,9 @@ var mapStatus = map[Status]string{ } func (api *TradingAPI) GetTradingOrderBookDepth(ctx context.Context, market int8) TradingOrderBookDepthResponse { - response := TradingOrderBookDepthResponse{ - Asks: [][]string{}, - Bids: [][]string{}, - } depth := getDepthForMarket(api.db, Market(market)) - response = transformMarketDepth(depth) + response := transformMarketDepth(depth) response.T = time.Now().Unix() return response @@ -340,6 +361,7 @@ func (api *TradingAPI) PlaceOrder(order *hu.SignedOrder) (common.Hash, error) { return orderId, err } if trader != signer && !api.configService.IsTradingAuthority(trader, signer) { + log.Error("not trading authority", "trader", trader.String(), "signer", signer.String()) return orderId, hu.ErrNoTradingAuthority } @@ -391,9 +413,21 @@ func (api *TradingAPI) PlaceOrder(order *hu.SignedOrder) (common.Hash, error) { RawOrder: order, OrderType: Signed, } + placeSignedOrderCounter.Inc(1) api.db.AddSignedOrder(signedOrder, requiredMargin) + if len(MakerbookDatabaseFile) > 0 { + go func() { + select { + case api.makerbookFileWriteChan <- *signedOrder: + log.Info("Successfully sent to the makerbook file write channel") + default: + log.Error("Failed to send to the makerbook file write channel", "order", signedOrder.Id.String()) + } + }() + } + // send to trader feed - both for head and accepted block go func() { orderMap := order.Map() @@ -422,3 +456,34 @@ func (api *TradingAPI) PlaceOrder(order *hu.SignedOrder) (common.Hash, error) { return orderId, nil } + +func writeOrderToFile(order Order) { + doc := map[string]interface{}{ + "type": "OrderAccepted", + "timestamp": time.Now().Unix(), + "trader": order.Trader.String(), + "orderHash": strings.ToLower(order.Id.String()), + "orderType": "signed", + "order": map[string]interface{}{ + "orderType": 2, + "expireAt": order.getExpireAt().Uint64(), + "ammIndex": int(order.Market), + "trader": order.Trader.String(), + "baseAssetQuantity": utils.BigIntToFloat(order.BaseAssetQuantity, 18), + "price": utils.BigIntToFloat(order.Price, 6), + "salt": order.Salt.Int64(), + "reduceOnly": order.ReduceOnly, + }, + } + jsonDoc, err := json.Marshal(doc) + if err != nil { + log.Error("writeOrderToFile: failed to marshal order", "err", err) + makerBookWriteFailuresCounter.Inc(1) + return + } + err = utils.AppendToFile(MakerbookDatabaseFile, jsonDoc) + if err != nil { + log.Error("writeOrderToFile: failed to write order to file", "err", err) + makerBookWriteFailuresCounter.Inc(1) + } +} diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index d8bdfd2e48..e9b1948624 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -1011,6 +1011,7 @@ func (vm *VM) CreateHandlers(context.Context) (map[string]http.Handler, error) { if err := handler.RegisterName("order", NewOrderAPI(vm.limitOrderProcesser.GetTradingAPI(), vm)); err != nil { return nil, err } + orderbook.MakerbookDatabaseFile = vm.config.MakerbookDatabasePath if err := handler.RegisterName("orderbook", vm.limitOrderProcesser.GetOrderBookAPI()); err != nil { return nil, err diff --git a/utils/file.go b/utils/file.go new file mode 100644 index 0000000000..b818ae7c37 --- /dev/null +++ b/utils/file.go @@ -0,0 +1,20 @@ +package utils + +import ( + "os" +) + +func AppendToFile(file string, data []byte) error { + f, err := os.OpenFile(file, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + // Add a newline to the beginning of the data + data = append([]byte("\n"), data...) + if _, err := f.Write(data); err != nil { + return err + } + return nil +}