Skip to content

Commit

Permalink
Node/CCQ: Server auto reconnect (#3611)
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley authored Dec 21, 2023
1 parent da77168 commit 8590089
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 13 deletions.
6 changes: 6 additions & 0 deletions node/cmd/ccq/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,10 @@ var (
Name: "ccq_server_perm_file_reload_failure",
Help: "Total number of times the permissions file failed to reload",
})

successfulReconnects = promauto.NewCounter(
prometheus.CounterOpts{
Name: "ccq_server_total_number_of_successful_reconnects",
Help: "Total number of successful reconnects to bootstrap peers",
})
)
34 changes: 33 additions & 1 deletion node/cmd/ccq/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type P2PSub struct {
host host.Host
}

func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, bootstrapPeers, ethRpcUrl, ethCoreAddr string, pendingResponses *PendingResponses, logger *zap.Logger) (*P2PSub, error) {
func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, bootstrapPeers, ethRpcUrl, ethCoreAddr string, pendingResponses *PendingResponses, logger *zap.Logger, monitorPeers bool) (*P2PSub, error) {
// p2p setup
components := p2p.DefaultComponents()
components.Port = port
Expand Down Expand Up @@ -96,6 +96,38 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
}
logger.Info("Found peers", zap.Int("numPeers", len(th_req.ListPeers())))

if monitorPeers {
logger.Info("Will monitor for missing peers once per minute.")
go func() {
t := time.NewTicker(time.Minute)
for {
select {
case <-ctx.Done():
logger.Info("Context cancelled, exiting peer monitoring.")
case <-t.C:
peers := th_req.ListPeers()
logger.Info("current peers", zap.Int("numPeers", len(peers)), zap.Any("peers", peers))
peerMap := map[string]struct{}{}
for _, peer := range peers {
peerMap[peer.String()] = struct{}{}
}
for _, p := range bootstrappers {
if _, exists := peerMap[p.ID.String()]; !exists {
logger.Info("attempting to reconnect to peer", zap.String("peer", p.ID.String()))
if err := h.Connect(ctx, p); err != nil {
logger.Error("failed to reconnect to peer", zap.String("peer", p.ID.String()), zap.Error(err))
} else {
logger.Info("Reconnected to peer", zap.String("peer", p.ID.String()))
peerMap[p.ID.String()] = struct{}{}
successfulReconnects.Inc()
}
}
}
}
}
}()
}

// Fetch the initial current guardian set
guardianSet, err := FetchCurrentGuardianSet(ethRpcUrl, ethCoreAddr)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion node/cmd/ccq/query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
promRemoteURL *string
shutdownDelay1 *uint
shutdownDelay2 *uint
monitorPeers *bool
)

const DEV_NETWORK_ID = "/wormhole/dev"
Expand All @@ -64,6 +65,7 @@ func init() {
telemetryNodeName = QueryServerCmd.Flags().String("telemetryNodeName", "", "Node name used in telemetry")
statusAddr = QueryServerCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
promRemoteURL = QueryServerCmd.Flags().String("promRemoteURL", "", "Prometheus remote write URL (Grafana)")
monitorPeers = QueryServerCmd.Flags().Bool("monitorPeers", false, "Should monitor bootstrap peers and attempt to reconnect")

// The default health check monitoring is every five seconds, with a five second timeout, and you have to miss two, for 20 seconds total.
shutdownDelay1 = QueryServerCmd.Flags().Uint("shutdownDelay1", 25, "Seconds to delay after disabling health check on shutdown")
Expand Down Expand Up @@ -168,7 +170,7 @@ func runQueryServer(cmd *cobra.Command, args []string) {

// Run p2p
pendingResponses := NewPendingResponses()
p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger)
p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers)
if err != nil {
logger.Fatal("Failed to start p2p", zap.Error(err))
}
Expand Down
8 changes: 4 additions & 4 deletions node/pkg/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func handleQueryRequestsImpl(
signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])

if _, exists := allowedRequestors[signerAddress]; !exists {
qLogger.Error("invalid requestor", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID))
qLogger.Debug("invalid requestor", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID))
invalidQueryRequestReceived.WithLabelValues("invalid_requestor").Inc()
continue
}
Expand Down Expand Up @@ -235,15 +235,15 @@ func handleQueryRequestsImpl(
for requestIdx, pcq := range queryRequest.PerChainQueries {
chainID := vaa.ChainID(pcq.ChainId)
if _, exists := supportedChains[chainID]; !exists {
qLogger.Error("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID))
qLogger.Debug("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID))
invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc()
errorFound = true
break
}

channel, channelExists := chainQueryReqC[chainID]
if !channelExists {
qLogger.Error("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID))
qLogger.Debug("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID))
invalidQueryRequestReceived.WithLabelValues("failed_to_look_up_channel").Inc()
errorFound = true
break
Expand Down Expand Up @@ -363,7 +363,7 @@ func handleQueryRequestsImpl(
timeout := pq.receiveTime.Add(requestTimeoutImpl)
qLogger.Debug("audit", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime), zap.Stringer("timeout", timeout))
if timeout.Before(now) {
qLogger.Error("query request timed out, dropping it", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime))
qLogger.Debug("query request timed out, dropping it", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime))
queryRequestsTimedOut.Inc()
delete(pendingQueries, reqId)
} else {
Expand Down
14 changes: 7 additions & 7 deletions node/pkg/watchers/evm/ccq.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (w *Watcher) ccqHandleEthCallQueryRequest(ctx context.Context, queryRequest

// Verify that the block read was successful.
if err := w.ccqVerifyBlockResult(blockError, blockResult); err != nil {
w.ccqLogger.Error("failed to verify block for eth_call query",
w.ccqLogger.Debug("failed to verify block for eth_call query",
zap.String("requestId", requestId),
zap.String("block", block),
zap.Any("batch", batch),
Expand All @@ -156,7 +156,7 @@ func (w *Watcher) ccqHandleEthCallQueryRequest(ctx context.Context, queryRequest
// Verify all the call results and build the batch of results.
results, err := w.ccqVerifyAndExtractQueryResults(requestId, evmCallData)
if err != nil {
w.ccqLogger.Error("failed to process eth_call query call request",
w.ccqLogger.Debug("failed to process eth_call query call request",
zap.String("requestId", requestId),
zap.String("block", block),
zap.Any("batch", batch),
Expand Down Expand Up @@ -311,7 +311,7 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q

// Verify the target block read was successful.
if err := w.ccqVerifyBlockResult(blockError, blockResult); err != nil {
w.ccqLogger.Error("failed to process eth_call_by_timestamp query target block request",
w.ccqLogger.Debug("failed to verify target block for eth_call_by_timestamp query",
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
Expand All @@ -324,7 +324,7 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q

// Verify the following block read was successful.
if err := w.ccqVerifyBlockResult(nextBlockError, nextBlockResult); err != nil {
w.ccqLogger.Error("failed to process eth_call_by_timestamp query following block request",
w.ccqLogger.Debug("failed to verify next block for eth_call_by_timestamp query",
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
Expand Down Expand Up @@ -399,7 +399,7 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
// Verify all the call results and build the batch of results.
results, err := w.ccqVerifyAndExtractQueryResults(requestId, evmCallData)
if err != nil {
w.ccqLogger.Error("failed to process eth_call_by_timestamp query call request",
w.ccqLogger.Debug("failed to process eth_call_by_timestamp query call request",
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
Expand Down Expand Up @@ -493,7 +493,7 @@ func (w *Watcher) ccqHandleEthCallWithFinalityQueryRequest(ctx context.Context,

// Verify that the block read was successful.
if err := w.ccqVerifyBlockResult(blockError, blockResult); err != nil {
w.ccqLogger.Error("failed to process eth_call_with_finality query block request",
w.ccqLogger.Debug("failed to verify block for eth_call_with_finality query",
zap.String("requestId", requestId),
zap.String("block", block),
zap.Any("batch", batch),
Expand Down Expand Up @@ -539,7 +539,7 @@ func (w *Watcher) ccqHandleEthCallWithFinalityQueryRequest(ctx context.Context,
// Verify all the call results and build the batch of results.
results, err := w.ccqVerifyAndExtractQueryResults(requestId, evmCallData)
if err != nil {
w.ccqLogger.Error("failed to process eth_call_with_finality query call request",
w.ccqLogger.Debug("failed to process eth_call_with_finality query call request",
zap.String("requestId", requestId),
zap.String("finality", req.Finality),
zap.Uint64("requestedBlockNumber", blockNumber),
Expand Down

0 comments on commit 8590089

Please sign in to comment.