Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
vgorkavenko committed Jan 27, 2025
1 parent e1b4ad0 commit 305b953
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 56 deletions.
102 changes: 58 additions & 44 deletions src/modules/csm/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from src.metrics.prometheus.csm import CSM_UNPROCESSED_EPOCHS_COUNT, CSM_MIN_UNPROCESSED_EPOCH
from src.modules.csm.state import State
from src.providers.consensus.client import ConsensusClient
from src.providers.consensus.types import BlockAttestation, BlockDetailsResponse, SyncCommittee, ProposerDuties
from src.providers.consensus.types import BlockAttestation, SyncCommittee, SyncAggregate
from src.types import BlockRoot, BlockStamp, EpochNumber, SlotNumber, ValidatorIndex
from src.utils.blockstamp import build_blockstamp
from src.utils.range import sequence
from src.utils.slot import get_next_non_missed_slot, get_blockstamp
from src.utils.slot import get_next_non_missed_slot
from src.utils.timeit import timeit
from src.utils.web3converter import Web3Converter

Expand Down Expand Up @@ -104,7 +104,9 @@ def _is_min_step_reached(self):
return False


type SlotBlockRoot = tuple[SlotNumber, BlockRoot | None]
type AttestationCommittees = dict[tuple[str, str], list[ValidatorDuty]]
type SyncCommittees = dict[SlotNumber, list[ValidatorDuty]]


class FrameCheckpointProcessor:
Expand Down Expand Up @@ -151,7 +153,7 @@ def _get_block_roots(self, checkpoint_slot: SlotNumber):

def _select_block_roots(
self, duty_epoch: EpochNumber, block_roots: list[BlockRoot | None], checkpoint_slot: SlotNumber
) -> list[tuple[SlotNumber, BlockRoot | None]]:
) -> tuple[list[SlotBlockRoot], list[SlotBlockRoot]]:
roots_to_check = []
# To check duties in the current epoch you need to
# have 32 slots of the current epoch and 32 slots of the next epoch
Expand All @@ -166,13 +168,19 @@ def _select_block_roots(
raise ValueError("Slot is out of the state block roots range")
roots_to_check.append((slot_to_check, block_roots[slot_to_check % SLOTS_PER_HISTORICAL_ROOT]))

return roots_to_check
duty_epoch_roots, next_epoch_roots = roots_to_check[:32], roots_to_check[32:]

def _process(self, unprocessed_epochs: list[EpochNumber], duty_epochs_roots: dict[EpochNumber, list[tuple[SlotNumber, BlockRoot | None]]]):
return duty_epoch_roots, next_epoch_roots

def _process(
self,
unprocessed_epochs: list[EpochNumber],
duty_epochs_roots: dict[EpochNumber, tuple[list[SlotBlockRoot], list[SlotBlockRoot]]]
):
executor = ThreadPoolExecutor(max_workers=variables.CSM_ORACLE_MAX_CONCURRENCY)
try:
futures = {
executor.submit(self._check_duty, duty_epoch, duty_epochs_roots[duty_epoch])
executor.submit(self._check_duties, duty_epoch, *duty_epochs_roots[duty_epoch])
for duty_epoch in unprocessed_epochs
}
for future in as_completed(futures):
Expand All @@ -186,26 +194,30 @@ def _process(self, unprocessed_epochs: list[EpochNumber], duty_epochs_roots: dic
logger.info({"msg": "The executor was shut down"})

@timeit(lambda args, duration: logger.info({"msg": f"Epoch {args.duty_epoch} processed in {duration:.2f} seconds"}))
def _check_duty(
def _check_duties(
self,
duty_epoch: EpochNumber,
two_epochs_block_roots: list[tuple[SlotNumber, BlockRoot | None]],
duty_epoch_roots: list[SlotBlockRoot],
next_epoch_roots: list[SlotBlockRoot],
):

logger.info({"msg": f"Processing epoch {duty_epoch}"})

# TODO: use get_block_details instead of get_block_attestations
att_committees = self._prepare_att_committees(EpochNumber(duty_epoch))
for slot, root in two_epochs_block_roots:
for slot, root in [*duty_epoch_roots, *next_epoch_roots]:
missed = root is None
if not missed:
# TODO: should we use get_block_details here?
attestations = self.cc.get_block_attestations(BlockRoot(root))
process_attestations(attestations, att_committees)

duty_epoch_block_roots = two_epochs_block_roots[:32]
propose_duties = self._prepare_propose_duties(EpochNumber(duty_epoch))
sync_committee = self._prepare_sync_committee(EpochNumber(duty_epoch), duty_epoch_block_roots)
process_sync(sync_committee, duty_epoch_block_roots)
process_proposals(propose_duties, duty_epoch_block_roots)
sync_committees = self._prepare_sync_committee(EpochNumber(duty_epoch), duty_epoch_roots)
for slot, root in duty_epoch_roots:
missed = root is None
if not missed:
propose_duties[slot].included = True
sync_aggregate = self.cc.get_block_details(BlockRoot(root)).message.body.sync_aggregate
process_sync(slot, sync_aggregate, sync_committees)

with lock:
if duty_epoch not in self.state.unprocessed_epochs:
Expand All @@ -217,14 +229,19 @@ def _check_duty(
att_duty.index,
included=att_duty.included,
)
for sync_duty in sync_committee:
self.state.increment_sync_duty(
for sync_committee in sync_committees.values():
for sync_duty in sync_committee:
self.state.increment_sync_duty(
duty_epoch,
sync_duty.index,
included=sync_duty.included,
)
for proposer_duty in propose_duties.values():
self.state.increment_prop_duty(
duty_epoch,
sync_duty.index,
included=sync_duty.included,
proposer_duty.index,
included=proposer_duty.included
)
for proposer_duty in propose_duties.values():
self.state.increment_prop_duty(duty_epoch, proposer_duty.index, proposer_duty.included)
self.state.add_processed_epoch(duty_epoch)
self.state.log_progress()
unprocessed_epochs = self.state.unprocessed_epochs
Expand Down Expand Up @@ -252,8 +269,8 @@ def _prepare_att_committees(self, epoch: EpochNumber) -> AttestationCommittees:
)
)
def _prepare_sync_committee(
self, epoch: EpochNumber, block_roots: list[tuple[SlotNumber, BlockRoot | None]]
) -> list[str]: # TODO: list ValidatorIndex
self, epoch: EpochNumber, block_roots: list[SlotBlockRoot]
) -> dict[SlotNumber, list[ValidatorDuty]]:
sync_committee_epochs = epoch % EPOCHS_PER_SYNC_COMMITTEE_PERIOD
if not self.current_sync_committee or sync_committee_epochs == 0:
for_epochs = EPOCHS_PER_SYNC_COMMITTEE_PERIOD - sync_committee_epochs
Expand All @@ -269,7 +286,16 @@ def _prepare_sync_committee(
)
# TODO: can we use lru cache here?
self.current_sync_committee = self.cc.get_sync_committee(blockstamp, epoch)
return self.current_sync_committee.validators
duties = {}
for slot, root in block_roots:
if root is None:
continue
duties[slot] = [
ValidatorDuty(index=ValidatorIndex(int(validator)), included=False)
for validator in self.current_sync_committee.validators
]

return duties

@timeit(
lambda args, duration: logger.info(
Expand All @@ -279,7 +305,9 @@ def _prepare_sync_committee(
def _prepare_propose_duties(self, epoch: EpochNumber) -> dict[SlotNumber, ValidatorDuty]:
duties = {}
for duty in self.cc.get_proposer_duties(epoch):
duties[SlotNumber(int(duty.slot))] = ValidatorDuty(index=ValidatorIndex(int(duty.validator_index)), included=False)
duties[SlotNumber(int(duty.slot))] = ValidatorDuty(
index=ValidatorIndex(int(duty.validator_index)), included=False
)
return duties


Expand All @@ -292,25 +320,11 @@ def process_attestations(attestations: Iterable[BlockAttestation], committees: A
validator_duty.included = validator_duty.included or _is_attested(att_bits, index_in_committee)


def process_sync(committee: list[str], block_roots: list[tuple[SlotNumber, BlockRoot | None]]):
duties = {}
for slot, root in block_roots:
if root is None:
continue
sync_bits = self.cc.get_block_details(BlockRoot(root)).message.body.sync_aggregate.sync_committee_bits
sync_bits = _to_bits(sync_bits)
for index_in_committee, validator_index in enumerate(committee):
duty = ValidatorDuty(index=ValidatorIndex(int(validator_index)), included=False)
duty.included = duty.included or _is_attested(sync_bits, index_in_committee)
duties[slot] = duty
return duties


def process_proposals(
duties: dict[SlotNumber, ValidatorDuty], block_roots: list[tuple[SlotNumber, BlockRoot | None]]
) -> None:
for slot, root in block_roots:
duties[slot].included = root is not None
def process_sync(slot: SlotNumber, sync_aggregate: SyncAggregate, committees: SyncCommittees) -> None:
committee = committees[slot]
sync_bits = _to_bits(sync_aggregate.sync_committee_bits)
for index_in_committee, validator_duty in enumerate(committee):
validator_duty.included = validator_duty.included or _is_attested(sync_bits, index_in_committee)


def _is_attested(bits: Sequence[bool], index: int) -> bool:
Expand Down
43 changes: 35 additions & 8 deletions src/modules/csm/csm.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,12 @@ def _calculate_distribution_in_frame(
frame: Frame,
distributed: int,
):
network_perf = self.state.get_network_aggr(frame).perf
att_network_perf = self.state.get_att_network_aggr(frame).perf
prop_network_perf = self.state.get_prop_network_aggr(frame).perf
sync_network_perf = self.state.get_sync_network_aggr(frame).perf

network_perf = 56/64 * att_network_perf + 8/64 * prop_network_perf + 2/64 * sync_network_perf

threshold = network_perf - self.w3.csm.oracle.perf_leeway_bp(blockstamp.block_hash) / TOTAL_BASIS_POINTS

# Build the map of the current distribution operators.
Expand All @@ -277,30 +282,52 @@ def _calculate_distribution_in_frame(
log = FramePerfLog(blockstamp, frame, threshold)

for (_, no_id), validators in operators_to_validators.items():
# TODO: Do we need to check it later to have other data in logs?
if no_id in stuck_operators:
log.operators[no_id].stuck = True
continue

for v in validators:
aggr = self.state.att_data[frame].get(ValidatorIndex(int(v.index)))
att_aggr = self.state.att_data[frame].get(ValidatorIndex(int(v.index)))
prop_aggr = self.state.prop_data[frame].get(ValidatorIndex(int(v.index)))
sync_aggr = self.state.sync_data[frame].get(ValidatorIndex(int(v.index)))

if aggr is None:
if att_aggr is None:
# It's possible that the validator is not assigned to any duty, hence it's performance
# is not presented in the aggregates (e.g. exited, pending for activation etc).
# TODO: do we need to check sync_aggr to strike the validator?
continue

log_data = log.operators[no_id].validators[v.index]

if v.validator.slashed is True:
# It means that validator was active during the frame and got slashed and didn't meet the exit
# epoch, so we should not count such validator for operator's share.
log.operators[no_id].validators[v.index].slashed = True
log_data.slashed = True
continue

if aggr.perf > threshold:
performance = att_aggr.perf

if prop_aggr is not None and sync_aggr is not None:
performance = 56/64 * att_aggr.perf + 8/64 * prop_aggr.perf + 2/64 * sync_aggr.perf

if prop_aggr is not None and sync_aggr is None:
performance = 56/62 * att_aggr.perf + 8/62 * prop_aggr.perf

if prop_aggr is None and sync_aggr is not None:
performance = 54 / 56 * att_aggr.perf + 2 / 56 * sync_aggr.perf

if performance > threshold:
# Count of assigned attestations used as a metrics of time
# the validator was active in the current frame.
distribution[no_id] += aggr.assigned

log.operators[no_id].validators[v.index].perf = aggr
distribution[no_id] += att_aggr.assigned

log_data.performance = performance
log_data.attestations = att_aggr
if prop_aggr is not None:
log_data.proposals = prop_aggr
if sync_aggr is not None:
log_data.sync_committee = sync_aggr

# Calculate share of each CSM node operator.
shares = defaultdict[NodeOperatorId, int](int)
Expand Down
6 changes: 4 additions & 2 deletions src/modules/csm/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ class LogJSONEncoder(json.JSONEncoder): ...

@dataclass
class ValidatorFrameSummary:
# TODO: Should be renamed. Perf means different things in different contexts
perf: DutyAccumulator = field(default_factory=DutyAccumulator)
attestations: DutyAccumulator = field(default_factory=DutyAccumulator)
proposals: DutyAccumulator = field(default_factory=DutyAccumulator)
sync_committee: DutyAccumulator = field(default_factory=DutyAccumulator)
performance: float = 0.0
slashed: bool = False


Expand Down
38 changes: 36 additions & 2 deletions src/modules/csm/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@ def calculate_frames(epochs_to_process: tuple[EpochNumber, ...], epochs_per_fram
frames.append((frame_epochs[0], frame_epochs[-1]))
return frames

def get_network_aggr(self, frame: Frame) -> DutyAccumulator:
def get_att_network_aggr(self, frame: Frame) -> DutyAccumulator:
# TODO: exclude `active_slashed` validators from the calculation
included = assigned = 0
frame_data = self.att_data.get(frame)
if not frame_data:
raise ValueError(f"No data for frame {frame} to calculate network aggregate")
raise ValueError(f"No data for frame {frame} to calculate attestations network aggregate")
for validator, acc in frame_data.items():
if acc.included > acc.assigned:
raise ValueError(f"Invalid accumulator: {validator=}, {acc=}")
Expand All @@ -241,3 +241,37 @@ def get_network_aggr(self, frame: Frame) -> DutyAccumulator:
)
logger.info({"msg": "Network attestations aggregate computed", "value": repr(aggr), "avg_perf": aggr.perf})
return aggr

def get_sync_network_aggr(self, frame: Frame) -> DutyAccumulator:
included = assigned = 0
frame_data = self.sync_data.get(frame)
if not frame_data:
raise ValueError(f"No data for frame {frame} to calculate syncs network aggregate")
for validator, acc in frame_data.items():
if acc.included > acc.assigned:
raise ValueError(f"Invalid accumulator: {validator=}, {acc=}")
included += acc.included
assigned += acc.assigned
aggr = DutyAccumulator(
included=included,
assigned=assigned,
)
logger.info({"msg": "Network syncs aggregate computed", "value": repr(aggr), "avg_perf": aggr.perf})
return aggr

def get_prop_network_aggr(self, frame: Frame) -> DutyAccumulator:
included = assigned = 0
frame_data = self.prop_data.get(frame)
if not frame_data:
raise ValueError(f"No data for frame {frame} to calculate proposal network aggregate")
for validator, acc in frame_data.items():
if acc.included > acc.assigned:
raise ValueError(f"Invalid accumulator: {validator=}, {acc=}")
included += acc.included
assigned += acc.assigned
aggr = DutyAccumulator(
included=included,
assigned=assigned,
)
logger.info({"msg": "Network proposal aggregate computed", "value": repr(aggr), "avg_perf": aggr.perf})
return aggr

0 comments on commit 305b953

Please sign in to comment.