Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored exit signature rotation #192

Merged
merged 6 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
817 changes: 416 additions & 401 deletions poetry.lock

Large diffs are not rendered by default.

13 changes: 5 additions & 8 deletions src/commands/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,6 @@ async def main() -> None:
if settings.enable_metrics:
await metrics_server()

# process outdated exit signatures
asyncio.create_task(
update_exit_signatures_periodically(
keystores=keystores,
remote_signer_config=remote_signer_config,
)
)

logger.info('Started operator service')
with InterruptHandler() as interrupt_handler:
while not interrupt_handler.exit:
Expand Down Expand Up @@ -333,6 +325,11 @@ async def main() -> None:
if settings.harvest_vault:
await harvest_vault_task()

# process outdated exit signatures
await update_exit_signatures_periodically(
keystores=keystores,
remote_signer_config=remote_signer_config,
)
# check balance
await check_hot_wallet_balance()

Expand Down
24 changes: 6 additions & 18 deletions src/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import asyncio
import logging
import time
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
Expand All @@ -9,7 +7,7 @@
from web3 import Web3
from web3.types import Timestamp, Wei

from src.common.clients import consensus_client, execution_client
from src.common.clients import consensus_client
from src.common.exceptions import (
InvalidOraclesRequestError,
NotEnoughOracleApprovalsError,
Expand Down Expand Up @@ -44,21 +42,11 @@ def log_verbose(e: Exception):
logger.error(repr(e))


async def wait_block_finalization(block_number: BlockNumber | None = None):
block_number = block_number or await execution_client.eth.get_block_number()
chain_head = None
sleep_time = 0.0

while not chain_head or chain_head.execution_block < block_number:
await asyncio.sleep(sleep_time)
start = time.time()

chain_head = await consensus_client.get_chain_finalized_head(
settings.network_config.SLOTS_PER_EPOCH
)

elapsed = time.time() - start
sleep_time = float(settings.network_config.SECONDS_PER_BLOCK) - elapsed
async def is_block_finalized(block_number: BlockNumber) -> bool:
chain_head = await consensus_client.get_chain_finalized_head(
settings.network_config.SLOTS_PER_EPOCH
)
return chain_head.execution_block >= block_number


def get_current_timestamp() -> Timestamp:
Expand Down
5 changes: 0 additions & 5 deletions src/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,3 @@ def network_config(self) -> NetworkConfig:

# Hashi vault timeout
HASHI_VAULT_TIMEOUT = 10

# Oracles signature update sync (10 minutes)
ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT = 600
# How often to pull update for oracle signature update (every minute)
ORACLE_SIGNATURE_UPDATE_SYNC_DELAY = 60
128 changes: 31 additions & 97 deletions src/exits/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import asyncio
import logging
import random
import time
from random import shuffle

from eth_typing import BlockNumber, BLSPubkey
from web3 import Web3
Expand All @@ -12,12 +10,8 @@
from src.common.execution import get_oracles
from src.common.metrics import metrics
from src.common.typings import Oracles
from src.common.utils import get_current_timestamp, wait_block_finalization
from src.config.settings import (
ORACLE_SIGNATURE_UPDATE_SYNC_DELAY,
ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT,
settings,
)
from src.common.utils import get_current_timestamp, is_block_finalized
from src.config.settings import settings
from src.exits.consensus import get_validator_public_keys
from src.exits.execution import submit_exit_signatures
from src.exits.typings import SignatureRotationRequest
Expand All @@ -39,99 +33,39 @@ async def update_exit_signatures_periodically(
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
):
# Oracle may have lag if operator was stopped
# during `update_exit_signatures_periodically` process.
# Wait oracles sync.
oracles = await get_oracles()
await _wait_oracles_signature_update(oracles)

while True:
timer_start = time.time()

try:
oracles = await get_oracles()

oracle_replicas = random.choice(oracles.endpoints) # nosec
oracle_endpoint = random.choice(oracle_replicas) # nosec
outdated_indexes = await _fetch_outdated_indexes(oracle_endpoint)

if outdated_indexes:
await _update_exit_signatures(
keystores=keystores,
remote_signer_config=remote_signer_config,
oracles=oracles,
outdated_indexes=outdated_indexes,
)

# Wait oracles sync.
await _wait_oracles_signature_update(oracles)
except Exception as e:
logger.exception(e)

elapsed = time.time() - timer_start
await asyncio.sleep(float(settings.network_config.SECONDS_PER_BLOCK) - elapsed)


async def _fetch_outdated_indexes(oracle_endpoint) -> list[int]:
response = await get_oracle_outdated_signatures_response(oracle_endpoint)
outdated_indexes = [val['index'] for val in response['validators']]

metrics.outdated_signatures.set(len(outdated_indexes))
return outdated_indexes


async def _wait_oracles_signature_update(oracles: Oracles) -> None:
last_event = await keeper_contract.get_exit_signatures_updated_event(vault=settings.vault)
if not last_event:
update_block = await _fetch_last_update_block()
if update_block and not await is_block_finalized(update_block):
logger.info('Signatures update block %d has not finalized yet', update_block)
return
update_block = BlockNumber(last_event['blockNumber'])

logger.info('Waiting for block %d finalization...', update_block)
await wait_block_finalization(update_block)

oracle_tasks = {
asyncio.create_task(
_wait_oracle_signature_update(
exit_signature_update_block=update_block,
oracle_endpoint=endpoint,
max_time=ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT,
)
outdated_indexes = await _fetch_outdated_indexes(oracles, update_block)
if outdated_indexes:
await _update_exit_signatures(
keystores=keystores,
remote_signer_config=remote_signer_config,
oracles=oracles,
outdated_indexes=outdated_indexes,
)
for replicas in oracles.endpoints
for endpoint in replicas
}
while oracle_tasks:
done, oracle_tasks = await asyncio.wait(oracle_tasks, return_when=asyncio.FIRST_COMPLETED)
if done:
for pending_task in oracle_tasks:
pending_task.cancel()
logger.info('Oracles have fetched exit signatures update')


async def _wait_oracle_signature_update(
exit_signature_update_block: BlockNumber, oracle_endpoint: str, max_time: int | float = 0
) -> None:
"""
Wait the oracle `oracle_endpoint` reads and processes `ExitSignatureUpdate` event
in the block `exit_signature_update_block`.
"""
elapsed = 0.0
start_time = time.time()

while elapsed <= max_time:
oracle_block = await _fetch_exit_signature_block(oracle_endpoint)
if oracle_block and oracle_block >= exit_signature_update_block:
return

logger.info(
'Waiting for %s to sync block %d...', oracle_endpoint, exit_signature_update_block
)
await asyncio.sleep(ORACLE_SIGNATURE_UPDATE_SYNC_DELAY)
elapsed = time.time() - start_time

raise asyncio.TimeoutError(
f'Timeout exceeded for wait_oracle_signature_block_update for {oracle_endpoint}'
)
async def _fetch_last_update_block() -> BlockNumber | None:
last_event = await keeper_contract.get_exit_signatures_updated_event(vault=settings.vault)
if last_event:
return BlockNumber(last_event['blockNumber'])
return None


async def _fetch_outdated_indexes(oracles: Oracles, update_block: BlockNumber | None) -> list[int]:
endpoints = [endpoint for replicas in oracles.endpoints for endpoint in replicas]
shuffle(endpoints)

for oracle_endpoint in endpoints:
response = await get_oracle_outdated_signatures_response(oracle_endpoint)
if not update_block or response['exit_signature_block_number'] >= update_block:
outdated_indexes = [val['index'] for val in response['validators']]
metrics.outdated_signatures.set(len(outdated_indexes))
return outdated_indexes
raise RuntimeError('Oracles have not synced exit signatures yet')


async def _update_exit_signatures(
Expand Down
35 changes: 2 additions & 33 deletions src/exits/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,20 @@
import asyncio
from pathlib import Path
from random import randint
from typing import Callable
from unittest import mock

import pytest
from eth_typing import BlockNumber, ChecksumAddress
from eth_typing import ChecksumAddress
from sw_utils.typings import ConsensusFork

from src.common.typings import Oracles
from src.common.utils import get_current_timestamp
from src.config.settings import settings
from src.exits.tasks import _get_oracles_request, _wait_oracle_signature_update
from src.exits.tasks import _get_oracles_request
from src.validators.signing.remote import RemoteSignerConfiguration
from src.validators.typings import ExitSignatureShards, Keystores


@pytest.mark.usefixtures('fake_settings')
class TestWaitOracleSignatureUpdate:
async def test_normal(self):
update_block = BlockNumber(3)
with (
mock.patch('asyncio.sleep'),
mock.patch('src.exits.tasks.time.time', return_value=100),
mock.patch(
'src.exits.tasks._fetch_exit_signature_block', side_effect=[None, 1, 2, 3]
) as fetch_mock,
):
await _wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)

assert fetch_mock.call_count == 4

async def test_timeout(self):
update_block = BlockNumber(3)
with (
mock.patch('asyncio.sleep'),
mock.patch('src.exits.tasks.time.time', side_effect=[100, 103, 106]),
mock.patch(
'src.exits.tasks._fetch_exit_signature_block', return_value=None
) as fetch_mock,
pytest.raises(asyncio.TimeoutError),
):
await _wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)

assert fetch_mock.call_count == 2


@pytest.mark.usefixtures('fake_settings')
class TestGetOraclesRequest:
async def test_local_keystores(
Expand Down