diff --git a/agent/app.py b/agent/app.py index dcd8cac..ec28e06 100644 --- a/agent/app.py +++ b/agent/app.py @@ -1,21 +1,20 @@ # encoding: utf-8 import logging -import requests from time import sleep from agent.utils import convert_int from agent.ckb_indexer import CKBIndexer, token_dict from agent.ckb_rpc import CkbRpc from agent.godwoken_rpc import GodwokenRpc -from agent.gw_config import GwConfig, devnet_config, testnet_config, \ +from agent.gw_config import devnet_config, testnet_config, \ testnet_v1_1_config, mainnet_config, mainnet_v1_config +from agent.sched_custodian import get_custodian import prometheus_client -from prometheus_client.core import CollectorRegistry, Gauge, Info +from prometheus_client.core import CollectorRegistry, Gauge from flask import Response, Flask import os import threading -from agent.sched_custodian import get_custodian logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) DISABLE_CUSTODIAN_STATS = 'DISABLE_CUSTODIAN_STATS' @@ -28,19 +27,19 @@ BlockNumber = None ## GLOBAL METRICS -LastBlockNumber = None Ping = None LastBlockHash = None LastBlockTimestamp = None BlockTimeDifference = None TPS = None -CommitTransacionCount = None +CommitTransacionCount = 0 CustodianStats = None DepositCount = 0 DepositCapacity = 0 WithdrawalCount = 0 WithdrawalCapacity = 0 +TaskLock = threading.Lock() def get_gw_stat_by_lock(lock_name, gw_rpc: GodwokenRpc, block_hash, ckb_rpc: CkbRpc, gw_config): @@ -72,6 +71,61 @@ def get_gw_stat_by_lock(lock_name, gw_rpc: GodwokenRpc, block_hash, return (len(output_dict), sum(output_dict.values())) +""" +Update metrics in batch. Some metrics are cumulative. +""" +def update_metrics(tip_number, ping, last_block_hash: str, + last_block_ts: int, + block_time_diff: int, + tps, + tx_cnt: int, + deposit_cnt, deposit_capacity, + withdrawal_cnt, withdrawal_capacity): + with TaskLock: + global BlockNumber + global Ping + global LastBlockHash + global LastBlockTimestamp + global BlockTimeDifference + global TPS + global CommitTransacionCount + global DepositCount + global DepositCapacity + global WithdrawalCount + global WithdrawalCapacity + + BlockNumber = tip_number + Ping = ping + LastBlockHash = last_block_hash + LastBlockTimestamp = last_block_ts + BlockTimeDifference = block_time_diff + TPS = tps + CommitTransacionCount += tx_cnt + DepositCount += deposit_cnt + DepositCapacity += deposit_capacity + WithdrawalCount += withdrawal_cnt + WithdrawalCapacity += withdrawal_capacity + + +""" +Reset metrics after being scraped by grafana successfully. +""" +def reset_metrics(): + with TaskLock: + global CommitTransacionCount + global DepositCount + global DepositCapacity + global WithdrawalCount + global WithdrawalCapacity + CommitTransacionCount = 0 + DepositCount = 0 + DepositCapacity = 0 + WithdrawalCount = 0 + WithdrawalCapacity = 0 + +""" +General metrics job. +""" class JobThread(threading.Thread): def __init__(self): @@ -110,85 +164,106 @@ def __init__(self): ) self.gw_config = testnet_config() + """ + Always try to get new tip. + """ def run(self): global BlockNumber - global LastBlockNumber - global Ping - ## remove Web3Version - ##global Web3Version - global LastBlockHash - global LastBlockTimestamp - global BlockTimeDifference - global TPS - global CommitTransacionCount - global CustodianStats - global DepositCount - global DepositCapacity - global WithdrawalCount - global WithdrawalCapacity while True: - sleep(5) + sleep(1) logging.info("Start running") if BlockNumber is None: try: - LastBlockNumber = self.gw_rpc.get_tip_number() + tip_number = self.gw_rpc.get_tip_number() except: logging.exception("Cannot get tip number") continue else: - LastBlockNumber = BlockNumber + tip_number = BlockNumber + 1 try: - Ping = self.gw_rpc.ping() + ping = self.gw_rpc.ping() - block = self.gw_rpc.get_block_by_number(hex(LastBlockNumber)) - LastBlockHash = block['hash'] - LastBlockTimestamp = convert_int(block['raw']['timestamp']) - CommitTransacionCount = len(block['transactions']) + block = self.gw_rpc.get_block_by_number(hex(tip_number)) + last_block_hash = block['hash'] + last_block_ts = convert_int(block['raw']['timestamp']) + tx_cnt = len(block['transactions']) previous_block_hash = block['raw']['parent_block_hash'] previous_block = self.gw_rpc.get_block(previous_block_hash) previous_block_time = convert_int(previous_block['block']['raw']['timestamp']) - BlockTimeDifference = abs(LastBlockTimestamp - previous_block_time) + block_time_diff = abs(last_block_ts - previous_block_time) - TPS = CommitTransacionCount / BlockTimeDifference * 1000 - except Exception as e: - ## ignore any exception - logging.error("get block info failed", exc_info=e) + if block_time_diff == 0: + tps = 0 + else: + tps = tx_cnt / block_time_diff * 1000 + except: + logging.exception("get block info failed") continue one_ckb = 100_000_000 - if DISABLE_CUSTODIAN_STATS not in os.environ: - logging.info("Loading custodian stats") - try: - CustodianStats = get_custodian( - self.ckb_indexer_url, self.gw_config, - LastBlockNumber) - except: - logging.exception("Failed to get custodian stats") logging.info("Loading deposit stats") try: - DepositCount, DepositCapacity = get_gw_stat_by_lock( + deposit_cnt, deposit_capacity = get_gw_stat_by_lock( "deposit_lock", self.gw_rpc, - LastBlockHash, self.ckb_rpc, + last_block_hash, self.ckb_rpc, self.gw_config) - DepositCapacity = DepositCapacity / one_ckb + deposit_capacity = deposit_capacity / one_ckb except: logging.exception("Failed to get deposit stats") + continue logging.info("Loading withdrawal stats") try: - WithdrawalCount, WithdrawalCapacity = get_gw_stat_by_lock( + withdrawal_cnt, withdrawal_capacity = get_gw_stat_by_lock( "withdrawal_lock", self.gw_rpc, - LastBlockHash, self.ckb_rpc, + last_block_hash, self.ckb_rpc, self.gw_config) - WithdrawalCapacity = WithdrawalCapacity / one_ckb + withdrawal_capacity = withdrawal_capacity / one_ckb except: logging.exception("Failed to get withdrawal stats") + continue + update_metrics( + tip_number, + ping, + last_block_hash, + last_block_ts, + block_time_diff, + tps, + tx_cnt, + deposit_cnt, + deposit_capacity, + withdrawal_cnt, + withdrawal_capacity) + + +""" +It takes a lot more time to calculate Custodian metrics than others. +And custodian metrics don't need to be real-time level. +So put this in a slower thread. +""" +class CustodianJobThread(JobThread): + def __init__(self): + super().__init__() + + def run(self): + global CustodianStats + while True: + sleep(10) + logging.info("Loading custodian stats") + try: + CustodianStats = get_custodian( + self.ckb_indexer_url, self.gw_config, + BlockNumber) + except: + logging.exception("Failed to get custodian stats") job = JobThread() job.start() +custodian_job = CustodianJobThread() +custodian_job.start() @NodeFlask.route("/metrics/godwoken/") @NodeFlask.route( @@ -197,7 +272,8 @@ def run(self): ) def exporter(block_number=None): global BlockNumber - BlockNumber = block_number + if block_number is not None: + BlockNumber = block_number registry = CollectorRegistry(auto_describe=False) last_block_number = Gauge("Node_Get_LastBlockNumber", @@ -319,8 +395,7 @@ def exporter(block_number=None): registry=registry, ) - global LastBlockNumber - last_block_number.labels(gw_rpc_url=gw_rpc_url).set(LastBlockNumber) + last_block_number.labels(gw_rpc_url=gw_rpc_url).set(BlockNumber) node_gw_ping.labels(gw_rpc_url=gw_rpc_url, gw_ping=Ping).set(1) @@ -328,7 +403,7 @@ def exporter(block_number=None): node_LastBlockInfo.labels( gw_rpc_url=gw_rpc_url, last_block_hash=LastBlockHash, - last_blocknumber=LastBlockNumber, + last_blocknumber=BlockNumber, last_block_timestamp=LastBlockTimestamp, ).set(BlockTimeDifference) @@ -369,5 +444,6 @@ def exporter(block_number=None): gw_withdrawal_cnt.labels(gw_rpc_url=gw_rpc_url).set(WithdrawalCount) gw_withdrawal_capacity.labels(gw_rpc_url=gw_rpc_url).set(WithdrawalCapacity) + reset_metrics() return Response(prometheus_client.generate_latest(registry), mimetype="text/plain") diff --git a/agent/godwoken_rpc.py b/agent/godwoken_rpc.py index 93403bf..be172d0 100644 --- a/agent/godwoken_rpc.py +++ b/agent/godwoken_rpc.py @@ -107,6 +107,7 @@ def get_block(self, block_hash): def get_tip_number(self): tip_block_hash = self.get_tip_block_hash() + print(f'tip block hash: {tip_block_hash}') tip = self.get_block(tip_block_hash) tip_number = tip['block']['raw']['number'] return convert_int(tip_number) diff --git a/agent/json_rpc.py b/agent/json_rpc.py index c57477e..4e3e3f3 100644 --- a/agent/json_rpc.py +++ b/agent/json_rpc.py @@ -24,7 +24,6 @@ def submit(self, method: str, params: List): r.raise_for_status() return r.json()["result"] except Exception as e: - print("test") logging.error(f"Submit request to {self.url}, method: {method}, params: {params}", exc_info = e) raise RPCException from None