Skip to content

Commit

Permalink
feat: cumulative metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
magicalne committed Sep 21, 2022
1 parent f06379b commit ee41cb8
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 52 deletions.
178 changes: 127 additions & 51 deletions agent/app.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
# encoding: utf-8

import logging
import requests
from time import sleep
from agent.utils import convert_int
from agent.ckb_indexer import CKBIndexer, token_dict
from agent.ckb_rpc import CkbRpc
from agent.godwoken_rpc import GodwokenRpc
from agent.gw_config import GwConfig, devnet_config, testnet_config, \
from agent.gw_config import devnet_config, testnet_config, \
testnet_v1_1_config, mainnet_config, mainnet_v1_config
from agent.sched_custodian import get_custodian
import prometheus_client
from prometheus_client.core import CollectorRegistry, Gauge, Info
from prometheus_client.core import CollectorRegistry, Gauge
from flask import Response, Flask
import os
import threading

from agent.sched_custodian import get_custodian
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)

DISABLE_CUSTODIAN_STATS = 'DISABLE_CUSTODIAN_STATS'
Expand All @@ -28,19 +27,19 @@

BlockNumber = None
## GLOBAL METRICS
LastBlockNumber = None
Ping = None
LastBlockHash = None
LastBlockTimestamp = None
BlockTimeDifference = None
TPS = None
CommitTransacionCount = None
CommitTransacionCount = 0
CustodianStats = None
DepositCount = 0
DepositCapacity = 0
WithdrawalCount = 0
WithdrawalCapacity = 0

TaskLock = threading.Lock()

def get_gw_stat_by_lock(lock_name, gw_rpc: GodwokenRpc, block_hash,
ckb_rpc: CkbRpc, gw_config):
Expand Down Expand Up @@ -72,6 +71,61 @@ def get_gw_stat_by_lock(lock_name, gw_rpc: GodwokenRpc, block_hash,
return (len(output_dict), sum(output_dict.values()))


"""
Update metrics in batch. Some metrics are cumulative.
"""
def update_metrics(tip_number, ping, last_block_hash: str,
last_block_ts: int,
block_time_diff: int,
tps,
tx_cnt: int,
deposit_cnt, deposit_capacity,
withdrawal_cnt, withdrawal_capacity):
with TaskLock:
global BlockNumber
global Ping
global LastBlockHash
global LastBlockTimestamp
global BlockTimeDifference
global TPS
global CommitTransacionCount
global DepositCount
global DepositCapacity
global WithdrawalCount
global WithdrawalCapacity

BlockNumber = tip_number
Ping = ping
LastBlockHash = last_block_hash
LastBlockTimestamp = last_block_ts
BlockTimeDifference = block_time_diff
TPS = tps
CommitTransacionCount += tx_cnt
DepositCount += deposit_cnt
DepositCapacity += deposit_capacity
WithdrawalCount += withdrawal_cnt
WithdrawalCapacity += withdrawal_capacity


"""
Reset metrics after being scraped by grafana successfully.
"""
def reset_metrics():
with TaskLock:
global CommitTransacionCount
global DepositCount
global DepositCapacity
global WithdrawalCount
global WithdrawalCapacity
CommitTransacionCount = 0
DepositCount = 0
DepositCapacity = 0
WithdrawalCount = 0
WithdrawalCapacity = 0

"""
General metrics job.
"""
class JobThread(threading.Thread):

def __init__(self):
Expand Down Expand Up @@ -110,85 +164,106 @@ def __init__(self):
)
self.gw_config = testnet_config()

"""
Always try to get new tip.
"""
def run(self):
global BlockNumber
global LastBlockNumber
global Ping
## remove Web3Version
##global Web3Version
global LastBlockHash
global LastBlockTimestamp
global BlockTimeDifference
global TPS
global CommitTransacionCount
global CustodianStats
global DepositCount
global DepositCapacity
global WithdrawalCount
global WithdrawalCapacity

while True:
sleep(5)
sleep(1)
logging.info("Start running")
if BlockNumber is None:
try:
LastBlockNumber = self.gw_rpc.get_tip_number()
tip_number = self.gw_rpc.get_tip_number()
except:
logging.exception("Cannot get tip number")
continue
else:
LastBlockNumber = BlockNumber
tip_number = BlockNumber + 1

try:
Ping = self.gw_rpc.ping()
ping = self.gw_rpc.ping()

block = self.gw_rpc.get_block_by_number(hex(LastBlockNumber))
LastBlockHash = block['hash']
LastBlockTimestamp = convert_int(block['raw']['timestamp'])
CommitTransacionCount = len(block['transactions'])
block = self.gw_rpc.get_block_by_number(hex(tip_number))
last_block_hash = block['hash']
last_block_ts = convert_int(block['raw']['timestamp'])
tx_cnt = len(block['transactions'])

previous_block_hash = block['raw']['parent_block_hash']
previous_block = self.gw_rpc.get_block(previous_block_hash)
previous_block_time = convert_int(previous_block['block']['raw']['timestamp'])
BlockTimeDifference = abs(LastBlockTimestamp - previous_block_time)
block_time_diff = abs(last_block_ts - previous_block_time)

TPS = CommitTransacionCount / BlockTimeDifference * 1000
except Exception as e:
## ignore any exception
logging.error("get block info failed", exc_info=e)
if block_time_diff == 0:
tps = 0
else:
tps = tx_cnt / block_time_diff * 1000
except:
logging.exception("get block info failed")
continue
one_ckb = 100_000_000
if DISABLE_CUSTODIAN_STATS not in os.environ:
logging.info("Loading custodian stats")
try:
CustodianStats = get_custodian(
self.ckb_indexer_url, self.gw_config,
LastBlockNumber)
except:
logging.exception("Failed to get custodian stats")
logging.info("Loading deposit stats")
try:
DepositCount, DepositCapacity = get_gw_stat_by_lock(
deposit_cnt, deposit_capacity = get_gw_stat_by_lock(
"deposit_lock", self.gw_rpc,
LastBlockHash, self.ckb_rpc,
last_block_hash, self.ckb_rpc,
self.gw_config)
DepositCapacity = DepositCapacity / one_ckb
deposit_capacity = deposit_capacity / one_ckb
except:
logging.exception("Failed to get deposit stats")
continue
logging.info("Loading withdrawal stats")
try:
WithdrawalCount, WithdrawalCapacity = get_gw_stat_by_lock(
withdrawal_cnt, withdrawal_capacity = get_gw_stat_by_lock(
"withdrawal_lock", self.gw_rpc,
LastBlockHash, self.ckb_rpc,
last_block_hash, self.ckb_rpc,
self.gw_config)
WithdrawalCapacity = WithdrawalCapacity / one_ckb
withdrawal_capacity = withdrawal_capacity / one_ckb
except:
logging.exception("Failed to get withdrawal stats")
continue
update_metrics(
tip_number,
ping,
last_block_hash,
last_block_ts,
block_time_diff,
tps,
tx_cnt,
deposit_cnt,
deposit_capacity,
withdrawal_cnt,
withdrawal_capacity)


"""
It takes a lot more time to calculate Custodian metrics than others.
And custodian metrics don't need to be real-time level.
So put this in a slower thread.
"""
class CustodianJobThread(JobThread):
def __init__(self):
super().__init__()

def run(self):
global CustodianStats
while True:
sleep(10)
logging.info("Loading custodian stats")
try:
CustodianStats = get_custodian(
self.ckb_indexer_url, self.gw_config,
BlockNumber)
except:
logging.exception("Failed to get custodian stats")


job = JobThread()
job.start()

custodian_job = CustodianJobThread()
custodian_job.start()

@NodeFlask.route("/metrics/godwoken/<block_number>")
@NodeFlask.route(
Expand All @@ -197,7 +272,8 @@ def run(self):
)
def exporter(block_number=None):
global BlockNumber
BlockNumber = block_number
if block_number is not None:
BlockNumber = block_number
registry = CollectorRegistry(auto_describe=False)

last_block_number = Gauge("Node_Get_LastBlockNumber",
Expand Down Expand Up @@ -319,16 +395,15 @@ def exporter(block_number=None):
registry=registry,
)

global LastBlockNumber
last_block_number.labels(gw_rpc_url=gw_rpc_url).set(LastBlockNumber)
last_block_number.labels(gw_rpc_url=gw_rpc_url).set(BlockNumber)

node_gw_ping.labels(gw_rpc_url=gw_rpc_url,
gw_ping=Ping).set(1)

node_LastBlockInfo.labels(
gw_rpc_url=gw_rpc_url,
last_block_hash=LastBlockHash,
last_blocknumber=LastBlockNumber,
last_blocknumber=BlockNumber,
last_block_timestamp=LastBlockTimestamp,
).set(BlockTimeDifference)

Expand Down Expand Up @@ -369,5 +444,6 @@ def exporter(block_number=None):
gw_withdrawal_cnt.labels(gw_rpc_url=gw_rpc_url).set(WithdrawalCount)
gw_withdrawal_capacity.labels(gw_rpc_url=gw_rpc_url).set(WithdrawalCapacity)

reset_metrics()
return Response(prometheus_client.generate_latest(registry),
mimetype="text/plain")
1 change: 1 addition & 0 deletions agent/godwoken_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def get_block(self, block_hash):

def get_tip_number(self):
tip_block_hash = self.get_tip_block_hash()
print(f'tip block hash: {tip_block_hash}')
tip = self.get_block(tip_block_hash)
tip_number = tip['block']['raw']['number']
return convert_int(tip_number)
Expand Down
1 change: 0 additions & 1 deletion agent/json_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def submit(self, method: str, params: List):
r.raise_for_status()
return r.json()["result"]
except Exception as e:
print("test")
logging.error(f"Submit request to {self.url}, method: {method}, params: {params}", exc_info = e)
raise RPCException from None

Expand Down

0 comments on commit ee41cb8

Please sign in to comment.