From 8ae2364d2c769ceac3344ad0941dbade196d8c2f Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 23 Oct 2020 11:46:58 -0400 Subject: [PATCH 01/22] add validator index to proposer data --- Cargo.lock | 14 +++++++------- beacon_node/http_api/Cargo.toml | 2 +- beacon_node/http_api/src/beacon_proposer_cache.rs | 1 + beacon_node/http_api/src/lib.rs | 2 +- beacon_node/http_api/tests/tests.rs | 2 +- beacon_node/http_metrics/Cargo.toml | 2 +- common/eth2/src/types.rs | 2 ++ common/warp_utils/Cargo.toml | 2 +- consensus/types/src/aggregate_and_proof.rs | 2 +- validator_client/Cargo.toml | 2 +- 10 files changed, 17 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca2397dc7ae..678f23a5912 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1375,8 +1375,7 @@ checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" [[package]] name = "discv5" version = "0.1.0-beta.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "595cf0600428cea17ce274cc11bb2fa8247a900a76fde1bbce2b81a39f335c02" +source = "git+https://github.com/sigp/discv5?rev=fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0#fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0" dependencies = [ "aes-ctr", "aes-gcm", @@ -1408,7 +1407,8 @@ dependencies = [ [[package]] name = "discv5" version = "0.1.0-beta.1" -source = "git+https://github.com/sigp/discv5?rev=fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0#fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595cf0600428cea17ce274cc11bb2fa8247a900a76fde1bbce2b81a39f335c02" dependencies = [ "aes-ctr", "aes-gcm", @@ -3987,8 +3987,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c7ad66970bbab360c97179b60906e2dc4aef1f7fca8ab4e5c5db8c97b16814a" +source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" dependencies = [ "arrayref", "bs58", @@ -4005,7 +4004,8 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.3" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c7ad66970bbab360c97179b60906e2dc4aef1f7fca8ab4e5c5db8c97b16814a" dependencies = [ "arrayref", "bs58", @@ -6783,7 +6783,7 @@ dependencies = [ [[package]] name = "warp" version = "0.2.5" -source = "git+https://github.com/paulhauner/warp?branch=cors-wildcard#a7685b76d70c3e5628e31d60aee510acec3c5c30" +source = "git+https://github.com/realbigsean/warp.git?branch=0.2.x#10441f38b2d47ee6e52afaf93a099aa75342c6b3" dependencies = [ "bytes 0.5.6", "futures 0.3.6", diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index acaea73999c..4cd16493bae 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Paul Hauner "] edition = "2018" [dependencies] -warp = { git = "https://github.com/paulhauner/warp", branch = "cors-wildcard" } +warp = { git = "https://github.com/realbigsean/warp.git", branch = "0.2.x" } serde = { version = "1.0.116", features = ["derive"] } tokio = { version = "0.2.22", features = ["macros"] } parking_lot = "0.11.0" diff --git a/beacon_node/http_api/src/beacon_proposer_cache.rs b/beacon_node/http_api/src/beacon_proposer_cache.rs index b062119e578..1dbcef1f50f 100644 --- a/beacon_node/http_api/src/beacon_proposer_cache.rs +++ b/beacon_node/http_api/src/beacon_proposer_cache.rs @@ -89,6 +89,7 @@ impl BeaconProposerCache { Ok(ProposerData { pubkey: PublicKeyBytes::from(pubkey), + validator_index: i as u64, slot, }) }) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 3aad79f3fa2..48e52495169 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -704,7 +704,7 @@ pub fn serve( * beacon/blocks */ - // POST beacon/blocks/{block_id} + // POST beacon/blocks let post_beacon_blocks = eth1_v1 .and(warp::path("beacon")) .and(warp::path("blocks")) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 0b5154ce225..7a93333195c 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1265,7 +1265,7 @@ impl ApiTester { .unwrap(); let pubkey = state.validators[index].pubkey.clone().into(); - ProposerData { pubkey, slot } + ProposerData { pubkey, validator_index: index as u64, slot } }) .collect::>(); diff --git a/beacon_node/http_metrics/Cargo.toml b/beacon_node/http_metrics/Cargo.toml index 5b1715e689e..19c65969cd6 100644 --- a/beacon_node/http_metrics/Cargo.toml +++ b/beacon_node/http_metrics/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] prometheus = "0.10.0" -warp = { git = "https://github.com/paulhauner/warp", branch = "cors-wildcard" } +warp = { git = "https://github.com/realbigsean/warp.git", branch = "0.2.x" } serde = { version = "1.0.116", features = ["derive"] } slog = "2.5.2" beacon_chain = { path = "../beacon_chain" } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 02fdc5ca4b7..f5bd0274e4a 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -388,6 +388,8 @@ pub struct AttesterData { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ProposerData { pub pubkey: PublicKeyBytes, + #[serde(with = "serde_utils::quoted_u64")] + pub validator_index: u64, pub slot: Slot, } diff --git a/common/warp_utils/Cargo.toml b/common/warp_utils/Cargo.toml index 1fc88ababd8..eed9f780c37 100644 --- a/common/warp_utils/Cargo.toml +++ b/common/warp_utils/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -warp = { git = "https://github.com/paulhauner/warp", branch = "cors-wildcard" } +warp = { git = "https://github.com/realbigsean/warp.git", branch = "0.2.x" } eth2 = { path = "../eth2" } types = { path = "../../consensus/types" } beacon_chain = { path = "../../beacon_node/beacon_chain" } diff --git a/consensus/types/src/aggregate_and_proof.rs b/consensus/types/src/aggregate_and_proof.rs index 52871226107..1c1127ccf6b 100644 --- a/consensus/types/src/aggregate_and_proof.rs +++ b/consensus/types/src/aggregate_and_proof.rs @@ -53,7 +53,7 @@ impl AggregateAndProof { Self { aggregator_index, - aggregate, + aggregate: aggregate, selection_proof, } } diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index a82adeff356..f1c3695d52b 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -52,7 +52,7 @@ eth2_keystore = { path = "../crypto/eth2_keystore" } account_utils = { path = "../common/account_utils" } lighthouse_version = { path = "../common/lighthouse_version" } warp_utils = { path = "../common/warp_utils" } -warp = { git = "https://github.com/paulhauner/warp", branch = "cors-wildcard" } +warp = { git = "https://github.com/realbigsean/warp.git", branch = "0.2.x" } hyper = "0.13.8" serde_utils = { path = "../consensus/serde_utils" } libsecp256k1 = "0.3.5" From 099707f1156d8980cf8161609d2889d607e36832 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 26 Oct 2020 14:43:02 -0400 Subject: [PATCH 02/22] update state id serialization error --- Cargo.lock | 14 +++++++------- beacon_node/http_api/Cargo.toml | 2 +- beacon_node/http_api/src/state_id.rs | 6 ++++-- beacon_node/http_api/tests/tests.rs | 6 +++++- beacon_node/http_metrics/Cargo.toml | 2 +- common/warp_utils/Cargo.toml | 2 +- consensus/types/src/aggregate_and_proof.rs | 2 +- validator_client/Cargo.toml | 2 +- 8 files changed, 21 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 678f23a5912..ca2397dc7ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1375,7 +1375,8 @@ checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" [[package]] name = "discv5" version = "0.1.0-beta.1" -source = "git+https://github.com/sigp/discv5?rev=fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0#fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595cf0600428cea17ce274cc11bb2fa8247a900a76fde1bbce2b81a39f335c02" dependencies = [ "aes-ctr", "aes-gcm", @@ -1407,8 +1408,7 @@ dependencies = [ [[package]] name = "discv5" version = "0.1.0-beta.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "595cf0600428cea17ce274cc11bb2fa8247a900a76fde1bbce2b81a39f335c02" +source = "git+https://github.com/sigp/discv5?rev=fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0#fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0" dependencies = [ "aes-ctr", "aes-gcm", @@ -3987,7 +3987,8 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.3" -source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c7ad66970bbab360c97179b60906e2dc4aef1f7fca8ab4e5c5db8c97b16814a" dependencies = [ "arrayref", "bs58", @@ -4004,8 +4005,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c7ad66970bbab360c97179b60906e2dc4aef1f7fca8ab4e5c5db8c97b16814a" +source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c" dependencies = [ "arrayref", "bs58", @@ -6783,7 +6783,7 @@ dependencies = [ [[package]] name = "warp" version = "0.2.5" -source = "git+https://github.com/realbigsean/warp.git?branch=0.2.x#10441f38b2d47ee6e52afaf93a099aa75342c6b3" +source = "git+https://github.com/paulhauner/warp?branch=cors-wildcard#a7685b76d70c3e5628e31d60aee510acec3c5c30" dependencies = [ "bytes 0.5.6", "futures 0.3.6", diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 4cd16493bae..acaea73999c 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Paul Hauner "] edition = "2018" [dependencies] -warp = { git = "https://github.com/realbigsean/warp.git", branch = "0.2.x" } +warp = { git = "https://github.com/paulhauner/warp", branch = "cors-wildcard" } serde = { version = "1.0.116", features = ["derive"] } tokio = { version = "0.2.22", features = ["macros"] } parking_lot = "0.11.0" diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index 11800648f25..7a9a3f34df9 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -110,9 +110,11 @@ impl StateId { } impl FromStr for StateId { - type Err = String; + type Err = warp::Rejection; fn from_str(s: &str) -> Result { - CoreStateId::from_str(s).map(Self) + CoreStateId::from_str(s) + .map(Self) + .map_err(|_| warp_utils::reject::custom_bad_request(format!("Invalid state ID: {}", s))) } } diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 7a93333195c..14909767dd3 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1265,7 +1265,11 @@ impl ApiTester { .unwrap(); let pubkey = state.validators[index].pubkey.clone().into(); - ProposerData { pubkey, validator_index: index as u64, slot } + ProposerData { + pubkey, + validator_index: index as u64, + slot, + } }) .collect::>(); diff --git a/beacon_node/http_metrics/Cargo.toml b/beacon_node/http_metrics/Cargo.toml index 19c65969cd6..5b1715e689e 100644 --- a/beacon_node/http_metrics/Cargo.toml +++ b/beacon_node/http_metrics/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] prometheus = "0.10.0" -warp = { git = "https://github.com/realbigsean/warp.git", branch = "0.2.x" } +warp = { git = "https://github.com/paulhauner/warp", branch = "cors-wildcard" } serde = { version = "1.0.116", features = ["derive"] } slog = "2.5.2" beacon_chain = { path = "../beacon_chain" } diff --git a/common/warp_utils/Cargo.toml b/common/warp_utils/Cargo.toml index eed9f780c37..1fc88ababd8 100644 --- a/common/warp_utils/Cargo.toml +++ b/common/warp_utils/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -warp = { git = "https://github.com/realbigsean/warp.git", branch = "0.2.x" } +warp = { git = "https://github.com/paulhauner/warp", branch = "cors-wildcard" } eth2 = { path = "../eth2" } types = { path = "../../consensus/types" } beacon_chain = { path = "../../beacon_node/beacon_chain" } diff --git a/consensus/types/src/aggregate_and_proof.rs b/consensus/types/src/aggregate_and_proof.rs index 1c1127ccf6b..52871226107 100644 --- a/consensus/types/src/aggregate_and_proof.rs +++ b/consensus/types/src/aggregate_and_proof.rs @@ -53,7 +53,7 @@ impl AggregateAndProof { Self { aggregator_index, - aggregate: aggregate, + aggregate, selection_proof, } } diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index f1c3695d52b..a82adeff356 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -52,7 +52,7 @@ eth2_keystore = { path = "../crypto/eth2_keystore" } account_utils = { path = "../common/account_utils" } lighthouse_version = { path = "../common/lighthouse_version" } warp_utils = { path = "../common/warp_utils" } -warp = { git = "https://github.com/realbigsean/warp.git", branch = "0.2.x" } +warp = { git = "https://github.com/paulhauner/warp", branch = "cors-wildcard" } hyper = "0.13.8" serde_utils = { path = "../consensus/serde_utils" } libsecp256k1 = "0.3.5" From 74d81e660586dd350fe2f086ca5db032a5fecc9c Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 26 Oct 2020 15:35:30 -0400 Subject: [PATCH 03/22] test logging ahead of error handling in beacon api --- beacon_node/http_api/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 48e52495169..b7363712f66 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1860,6 +1860,7 @@ pub fn serve( .boxed()) .boxed() // Maps errors into HTTP responses. + .with(slog_logging(log.clone())) .recover(warp_utils::reject::handle_rejection) .with(slog_logging(log.clone())) .with(prometheus_metrics()) From 8473a299242849cdfda078fa23c56c99df838e63 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 26 Oct 2020 16:46:16 -0400 Subject: [PATCH 04/22] add validator balances endpoint --- beacon_node/http_api/src/lib.rs | 62 +++++++++++++++++--- beacon_node/http_api/tests/tests.rs | 87 ++++++++++++++++++++++++++++- common/eth2/src/lib.rs | 29 ++++++++++ common/eth2/src/types.rs | 13 +++++ common/warp_utils/src/reject.rs | 3 + 5 files changed, 184 insertions(+), 10 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b7363712f66..7ffa83dcf6d 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -421,6 +421,50 @@ pub fn serve( }) }); + // GET beacon/states/{state_id}/validator_balances?id + let get_beacon_state_validator_balances = beacon_states_path + .clone() + .and(warp::path("validator_balances")) + .and(warp::path::end()) + .and(warp::query::()) + .and_then( + |state_id: StateId, + chain: Arc>, + query: api_types::ValidatorBalancesQuery| { + blocking_json_task(move || { + state_id + .map_state(&chain, |state| { + Ok(state + .validators + .iter() + .zip(state.balances.iter()) + .enumerate() + // filter by validator id(s) if provided + .filter(|(index, (validator, _))| { + query.id.as_ref().map_or(true, |ids| { + ids.0.iter().any(|id| match id { + ValidatorId::PublicKey(pubkey) => { + &validator.pubkey == pubkey + } + ValidatorId::Index(param_index) => { + *param_index == *index as u64 + } + }) + }) + }) + .map(|(index, (_, balance))| { + Some(api_types::ValidatorBalanceData { + index: index as u64, + balance: *balance, + }) + }) + .collect::>()) + }) + .map(api_types::GenericResponse::from) + }) + }, + ); + // GET beacon/states/{state_id}/validators let get_beacon_state_validators = beacon_states_path .clone() @@ -537,8 +581,8 @@ pub fn serve( } else { CommitteeCache::initialized(state, epoch, &chain.spec).map(Cow::Owned) } - .map_err(BeaconChainError::BeaconStateError) - .map_err(warp_utils::reject::beacon_chain_error)?; + .map_err(BeaconChainError::BeaconStateError) + .map_err(warp_utils::reject::beacon_chain_error)?; // Use either the supplied slot or all slots in the epoch. let slots = query.slot.map(|slot| vec![slot]).unwrap_or_else(|| { @@ -566,11 +610,11 @@ pub fn serve( let committee = committee_cache .get_beacon_committee(slot, index) .ok_or_else(|| { - warp_utils::reject::custom_bad_request(format!( - "committee index {} does not exist in epoch {}", - index, epoch - )) - })?; + warp_utils::reject::custom_bad_request(format!( + "committee index {} does not exist in epoch {}", + index, epoch + )) + })?; response.push(api_types::CommitteeData { index, @@ -1605,7 +1649,7 @@ pub fn serve( return Err(warp_utils::reject::object_invalid(format!( "gossip verification failed: {:?}", e - ))) + ))); } }; @@ -1808,6 +1852,7 @@ pub fn serve( .or(get_beacon_state_root.boxed()) .or(get_beacon_state_fork.boxed()) .or(get_beacon_state_finality_checkpoints.boxed()) + .or(get_beacon_state_validator_balances.boxed()) .or(get_beacon_state_validators.boxed()) .or(get_beacon_state_validators_id.boxed()) .or(get_beacon_state_committees.boxed()) @@ -1860,7 +1905,6 @@ pub fn serve( .boxed()) .boxed() // Maps errors into HTTP responses. - .with(slog_logging(log.clone())) .recover(warp_utils::reject::handle_rejection) .with(slog_logging(log.clone())) .with(prometheus_metrics()) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 14909767dd3..dcd3b820bf4 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -409,6 +409,87 @@ impl ApiTester { self } + pub async fn test_beacon_states_validator_balances(self) -> Self { + for state_id in self.interesting_state_ids() { + for validator_indices in self.interesting_validator_indices() { + let state_opt = self.get_state(state_id); + let validators: Vec = match state_opt.as_ref() { + Some(state) => state.validators.clone().into(), + None => vec![], + }; + let validator_index_ids = validator_indices + .iter() + .cloned() + .map(|i| ValidatorId::Index(i)) + .collect::>(); + let validator_pubkey_ids = validator_indices + .iter() + .cloned() + .map(|i| { + ValidatorId::PublicKey( + validators + .get(i as usize) + .map_or(PublicKeyBytes::empty(), |val| val.pubkey.clone()), + ) + }) + .collect::>(); + + let result_index_ids = self + .client + .get_beacon_states_validator_balances( + state_id, + Some(validator_index_ids.as_slice()), + ) + .await + .unwrap() + .map(|res| res.data); + + let result_pubkey_ids = self + .client + .get_beacon_states_validator_balances( + state_id, + Some(validator_pubkey_ids.as_slice()), + ) + .await + .unwrap() + .map(|res| res.data); + + let expected = state_opt.map(|state| { + let epoch = state.current_epoch(); + let finalized_epoch = state.finalized_checkpoint.epoch; + let far_future_epoch = self.chain.spec.far_future_epoch; + + let mut validators = Vec::with_capacity(validator_indices.len()); + + for i in validator_indices { + if i >= state.validators.len() as u64 { + continue; + } + let validator = state.validators[i as usize].clone(); + validators.push(ValidatorData { + index: i as u64, + balance: state.balances[i as usize], + status: ValidatorStatus::from_validator( + Some(&validator), + epoch, + finalized_epoch, + far_future_epoch, + ), + validator, + }); + } + + validators + }); + + assert_eq!(result_index_ids, expected, "{:?}", state_id); + assert_eq!(result_pubkey_ids, expected, "{:?}", state_id); + } + } + + self + } + pub async fn test_beacon_states_validators(self) -> Self { for state_id in self.interesting_state_ids() { let result = self @@ -1618,7 +1699,11 @@ async fn beacon_states_finality_checkpoints() { #[tokio::test(core_threads = 2)] async fn beacon_states_validators() { - ApiTester::new().test_beacon_states_validators().await; + ApiTester::new() + .test_beacon_states_validator_balances + .await + .test_beacon_states_validators() + .await; } #[tokio::test(core_threads = 2)] diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 8a0ffbe35ca..eac6ac4315e 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -210,6 +210,35 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } + /// `GET beacon/states/{state_id}/validator_balances?id` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_states_validator_balances( + &self, + state_id: StateId, + ids: Option<&[ValidatorId]>, + ) -> Result>>, Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("states") + .push(&state_id.to_string()) + .push("validators"); + + if let Some(ids) = ids { + let id_string = ids + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(","); + path.query_pairs_mut().append_pair("id", &id_string); + } + + self.get_opt(path).await + } + /// `GET beacon/states/{state_id}/validators` /// /// Returns `Ok(None)` on a 404 error. diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index f5bd0274e4a..9e7d81551a4 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -206,6 +206,14 @@ pub struct ValidatorData { pub validator: Validator, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ValidatorBalanceData { + #[serde(with = "serde_utils::quoted_u64")] + pub index: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub balance: u64, +} + // TODO: This does not currently match the spec, but I'm going to try and change the spec using // this proposal: // @@ -364,6 +372,11 @@ impl TryFrom for QueryVec { } } +#[derive(Clone, Deserialize)] +pub struct ValidatorBalancesQuery { + pub id: Option>, +} + #[derive(Clone, Deserialize)] pub struct ValidatorDutiesQuery { pub index: Option>, diff --git a/common/warp_utils/src/reject.rs b/common/warp_utils/src/reject.rs index 020fa19d8b8..977e76a6a3a 100644 --- a/common/warp_utils/src/reject.rs +++ b/common/warp_utils/src/reject.rs @@ -119,6 +119,9 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result() { message = format!("BAD_REQUEST: body deserialize error: {}", e); code = StatusCode::BAD_REQUEST; From e266cebca40328a0a53bcf2266cc447d59ef746f Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 27 Oct 2020 17:35:31 -0400 Subject: [PATCH 05/22] fix error codes in beacon api --- beacon_node/http_api/src/lib.rs | 48 ++++++++++++++++++++------------- common/warp_utils/src/reject.rs | 3 --- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 7ffa83dcf6d..9dd9920bbf6 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -33,7 +33,7 @@ use state_id::StateId; use state_processing::per_slot_processing; use std::borrow::Cow; use std::convert::TryInto; -use std::future::Future; +use std::future::{ready, Future}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; @@ -376,7 +376,11 @@ pub fn serve( let beacon_states_path = eth1_v1 .and(warp::path("beacon")) .and(warp::path("states")) - .and(warp::path::param::()) + .and(warp::path::param::().or_else(|_| { + ready(Err(warp_utils::reject::custom_bad_request( + "test bead request".to_string(), + ))) + })) .and(chain_filter.clone()); // GET beacon/states/{state_id}/root @@ -1846,9 +1850,10 @@ pub fn serve( }); // Define the ultimate set of routes that will be provided to the server. - let routes = warp::get() + let get_routes = warp::get() .and( get_beacon_genesis + .boxed() .or(get_beacon_state_root.boxed()) .or(get_beacon_state_fork.boxed()) .or(get_beacon_state_finality_checkpoints.boxed()) @@ -1888,22 +1893,27 @@ pub fn serve( .or(get_lighthouse_proto_array.boxed()) .or(get_lighthouse_validator_inclusion_global.boxed()) .or(get_lighthouse_validator_inclusion.boxed()) - .or(get_lighthouse_beacon_states_ssz.boxed()) - .boxed(), + .or(get_lighthouse_beacon_states_ssz.boxed()), + ) + // Maps errors into HTTP responses. + .recover(warp_utils::reject::handle_rejection) + .with(slog_logging(log.clone())) + .with(prometheus_metrics()) + // Add a `Server` header. + .map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform())) + .with(cors_builder.clone().build()); + + let post_routes = warp::post() + .and( + post_beacon_blocks + .boxed() + .or(post_beacon_pool_attestations.boxed()) + .or(post_beacon_pool_attester_slashings.boxed()) + .or(post_beacon_pool_proposer_slashings.boxed()) + .or(post_beacon_pool_voluntary_exits.boxed()) + .or(post_validator_aggregate_and_proofs.boxed()) + .or(post_validator_beacon_committee_subscriptions.boxed()), ) - .or(warp::post() - .and( - post_beacon_blocks - .or(post_beacon_pool_attestations.boxed()) - .or(post_beacon_pool_attester_slashings.boxed()) - .or(post_beacon_pool_proposer_slashings.boxed()) - .or(post_beacon_pool_voluntary_exits.boxed()) - .or(post_validator_aggregate_and_proofs.boxed()) - .or(post_validator_beacon_committee_subscriptions.boxed()) - .boxed(), - ) - .boxed()) - .boxed() // Maps errors into HTTP responses. .recover(warp_utils::reject::handle_rejection) .with(slog_logging(log.clone())) @@ -1912,6 +1922,8 @@ pub fn serve( .map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform())) .with(cors_builder.build()); + let routes = get_routes.or(post_routes); + let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( SocketAddrV4::new(config.listen_addr, config.listen_port), async { diff --git a/common/warp_utils/src/reject.rs b/common/warp_utils/src/reject.rs index 977e76a6a3a..020fa19d8b8 100644 --- a/common/warp_utils/src/reject.rs +++ b/common/warp_utils/src/reject.rs @@ -119,9 +119,6 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result() { message = format!("BAD_REQUEST: body deserialize error: {}", e); code = StatusCode::BAD_REQUEST; From 32299c45d4abcff89cc92fbfc871648dd61eaa10 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 28 Oct 2020 11:03:23 -0400 Subject: [PATCH 06/22] remove unstable `future::ready` --- beacon_node/http_api/src/lib.rs | 28 +++++++++------------------- beacon_node/http_api/tests/tests.rs | 26 ++++++-------------------- common/eth2/src/lib.rs | 2 +- 3 files changed, 16 insertions(+), 40 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 9dd9920bbf6..4d3e5ff5bc6 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -33,7 +33,7 @@ use state_id::StateId; use state_processing::per_slot_processing; use std::borrow::Cow; use std::convert::TryInto; -use std::future::{ready, Future}; +use std::future::Future; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; @@ -377,9 +377,11 @@ pub fn serve( .and(warp::path("beacon")) .and(warp::path("states")) .and(warp::path::param::().or_else(|_| { - ready(Err(warp_utils::reject::custom_bad_request( - "test bead request".to_string(), - ))) + blocking_task(|| { + Err(warp_utils::reject::custom_bad_request( + "Invalid state ID".to_string(), + )) + }) })) .and(chain_filter.clone()); @@ -1850,7 +1852,7 @@ pub fn serve( }); // Define the ultimate set of routes that will be provided to the server. - let get_routes = warp::get() + let routes = warp::get() .and( get_beacon_genesis .boxed() @@ -1895,16 +1897,7 @@ pub fn serve( .or(get_lighthouse_validator_inclusion.boxed()) .or(get_lighthouse_beacon_states_ssz.boxed()), ) - // Maps errors into HTTP responses. - .recover(warp_utils::reject::handle_rejection) - .with(slog_logging(log.clone())) - .with(prometheus_metrics()) - // Add a `Server` header. - .map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform())) - .with(cors_builder.clone().build()); - - let post_routes = warp::post() - .and( + .or(warp::post().and( post_beacon_blocks .boxed() .or(post_beacon_pool_attestations.boxed()) @@ -1913,8 +1906,7 @@ pub fn serve( .or(post_beacon_pool_voluntary_exits.boxed()) .or(post_validator_aggregate_and_proofs.boxed()) .or(post_validator_beacon_committee_subscriptions.boxed()), - ) - // Maps errors into HTTP responses. + )) .recover(warp_utils::reject::handle_rejection) .with(slog_logging(log.clone())) .with(prometheus_metrics()) @@ -1922,8 +1914,6 @@ pub fn serve( .map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform())) .with(cors_builder.build()); - let routes = get_routes.or(post_routes); - let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( SocketAddrV4::new(config.listen_addr, config.listen_port), async { diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index dcd3b820bf4..7887e09440a 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -443,7 +443,6 @@ impl ApiTester { .await .unwrap() .map(|res| res.data); - let result_pubkey_ids = self .client .get_beacon_states_validator_balances( @@ -455,28 +454,15 @@ impl ApiTester { .map(|res| res.data); let expected = state_opt.map(|state| { - let epoch = state.current_epoch(); - let finalized_epoch = state.finalized_checkpoint.epoch; - let far_future_epoch = self.chain.spec.far_future_epoch; - let mut validators = Vec::with_capacity(validator_indices.len()); for i in validator_indices { - if i >= state.validators.len() as u64 { - continue; + if i < state.balances.len() as u64 { + validators.push(ValidatorBalanceData { + index: i as u64, + balance: state.balances[i as usize], + }); } - let validator = state.validators[i as usize].clone(); - validators.push(ValidatorData { - index: i as u64, - balance: state.balances[i as usize], - status: ValidatorStatus::from_validator( - Some(&validator), - epoch, - finalized_epoch, - far_future_epoch, - ), - validator, - }); } validators @@ -1700,7 +1686,7 @@ async fn beacon_states_finality_checkpoints() { #[tokio::test(core_threads = 2)] async fn beacon_states_validators() { ApiTester::new() - .test_beacon_states_validator_balances + .test_beacon_states_validator_balances() .await .test_beacon_states_validators() .await; diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index eac6ac4315e..4fe6ebfcf05 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -225,7 +225,7 @@ impl BeaconNodeHttpClient { .push("beacon") .push("states") .push(&state_id.to_string()) - .push("validators"); + .push("validator_balances"); if let Some(ids) = ids { let id_string = ids From e437ce52ebf19e1b5da2b4cf07b5f4a41786ab79 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 28 Oct 2020 13:35:24 -0400 Subject: [PATCH 07/22] update aggregate and proof endpoint to accept arrays --- beacon_node/beacon_chain/src/beacon_chain.rs | 4 +- beacon_node/http_api/src/lib.rs | 64 +++++++++++------- beacon_node/http_api/tests/tests.rs | 4 +- common/eth2/src/lib.rs | 4 +- validator_client/src/attestation_service.rs | 69 ++++++++++++-------- 5 files changed, 85 insertions(+), 60 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e387150fe80..1f3a18b3e3e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -984,9 +984,9 @@ impl BeaconChain { /// /// - `VerifiedUnaggregatedAttestation` /// - `VerifiedAggregatedAttestation` - pub fn apply_attestation_to_fork_choice<'a>( + pub fn apply_attestation_to_fork_choice( &self, - verified: &'a impl SignatureVerifiedAttestation, + verified: &impl SignatureVerifiedAttestation, ) -> Result<(), Error> { let _timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 4d3e5ff5bc6..e6fb25649c0 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1638,49 +1638,63 @@ pub fn serve( .and(network_tx_filter.clone()) .and_then( |chain: Arc>, - aggregate: SignedAggregateAndProof, + aggregates: Vec>, network_tx: UnboundedSender>| { blocking_json_task(move || { - let aggregate = + let mut verified_aggregates = Vec::new(); + + // Verify that all messages in the post are valid before processing further + for aggregate in aggregates.as_slice() { match chain.verify_aggregated_attestation_for_gossip(aggregate.clone()) { - Ok(aggregate) => aggregate, + Ok(verified_aggregate) => verified_aggregates.push(verified_aggregate), // If we already know the attestation, don't broadcast it or attempt to // further verify it. Return success. // // It's reasonably likely that two different validators produce // identical aggregates, especially if they're using the same beacon // node. - Err(AttnError::AttestationAlreadyKnown(_)) => return Ok(()), + Err(AttnError::AttestationAlreadyKnown(_)) => continue, Err(e) => { return Err(warp_utils::reject::object_invalid(format!( "gossip verification failed: {:?}", e ))); } - }; - - publish_pubsub_message( - &network_tx, - PubsubMessage::AggregateAndProofAttestation(Box::new( - aggregate.aggregate().clone(), - )), - )?; + } + } - chain - .apply_attestation_to_fork_choice(&aggregate) - .map_err(|e| { - warp_utils::reject::broadcast_without_import(format!( - "not applied to fork choice: {:?}", - e + let messages: Vec> = verified_aggregates + .iter() + .map(|verified_aggregate| { + PubsubMessage::AggregateAndProofAttestation(Box::new( + verified_aggregate.aggregate().clone(), )) - })?; + }) + .collect(); - chain.add_to_block_inclusion_pool(aggregate).map_err(|e| { - warp_utils::reject::broadcast_without_import(format!( - "not applied to block inclusion pool: {:?}", - e - )) - })?; + if !messages.is_empty() { + publish_network_message(&network_tx, NetworkMessage::Publish { messages })?; + } + + for verified_aggregate in verified_aggregates { + chain + .apply_attestation_to_fork_choice(&verified_aggregate) + .map_err(|e| { + warp_utils::reject::broadcast_without_import(format!( + "not applied to fork choice: {:?}", + e + )) + })?; + + chain + .add_to_block_inclusion_pool(verified_aggregate) + .map_err(|e| { + warp_utils::reject::broadcast_without_import(format!( + "not applied to block inclusion pool: {:?}", + e + )) + })?; + } Ok(()) }) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 7887e09440a..2ffa6b03c27 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1554,7 +1554,7 @@ impl ApiTester { let aggregate = self.get_aggregate().await; self.client - .post_validator_aggregate_and_proof::(&aggregate) + .post_validator_aggregate_and_proof::(&vec![aggregate]) .await .unwrap(); @@ -1569,7 +1569,7 @@ impl ApiTester { aggregate.message.aggregate.data.slot += 1; self.client - .post_validator_aggregate_and_proof::(&aggregate) + .post_validator_aggregate_and_proof::(&vec![aggregate]) .await .unwrap_err(); diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 4fe6ebfcf05..1333a5bdb41 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -846,7 +846,7 @@ impl BeaconNodeHttpClient { /// `POST validator/aggregate_and_proofs` pub async fn post_validator_aggregate_and_proof( &self, - aggregate: &SignedAggregateAndProof, + aggregates: &Vec>, ) -> Result<(), Error> { let mut path = self.eth_path()?; @@ -855,7 +855,7 @@ impl BeaconNodeHttpClient { .push("validator") .push("aggregate_and_proofs"); - self.post(path, aggregate).await?; + self.post(path, aggregates).await?; Ok(()) } diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index d675ebda2e8..60c5d18d579 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -437,6 +437,8 @@ impl AttestationService { .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))? .data; + let mut signed_aggregate_and_proofs = Vec::new(); + for duty_and_proof in validator_duties { let selection_proof = if let Some(proof) = duty_and_proof.selection_proof.as_ref() { proof @@ -462,44 +464,53 @@ impl AttestationService { continue; } - let signed_aggregate_and_proof = if let Some(aggregate) = - self.validator_store.produce_signed_aggregate_and_proof( - pubkey, - validator_index, - aggregated_attestation.clone(), - selection_proof.clone(), - ) { - aggregate + if let Some(aggregate) = self.validator_store.produce_signed_aggregate_and_proof( + pubkey, + validator_index, + aggregated_attestation.clone(), + selection_proof.clone(), + ) { + signed_aggregate_and_proofs.push(aggregate); } else { crit!(log, "Failed to sign attestation"); continue; }; + } - let attestation = &signed_aggregate_and_proof.message.aggregate; - + if !signed_aggregate_and_proofs.is_empty() { match self .beacon_node - .post_validator_aggregate_and_proof(&signed_aggregate_and_proof) + .post_validator_aggregate_and_proof(&signed_aggregate_and_proofs) .await { - Ok(()) => info!( - log, - "Successfully published attestation"; - "aggregator" => signed_aggregate_and_proof.message.aggregator_index, - "signatures" => attestation.aggregation_bits.num_set_bits(), - "head_block" => format!("{:?}", attestation.data.beacon_block_root), - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - "type" => "aggregated", - ), - Err(e) => crit!( - log, - "Failed to publish attestation"; - "error" => e.to_string(), - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - "type" => "aggregated", - ), + Ok(()) => { + for signed_aggregate_and_proof in signed_aggregate_and_proofs { + let attestation = &signed_aggregate_and_proof.message.aggregate; + info!( + log, + "Successfully published attestations"; + "aggregator" => signed_aggregate_and_proof.message.aggregator_index, + "signatures" => attestation.aggregation_bits.num_set_bits(), + "head_block" => format!("{:?}", attestation.data.beacon_block_root), + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + "type" => "aggregated", + ); + } + } + Err(e) => { + for signed_aggregate_and_proof in signed_aggregate_and_proofs { + let attestation = &signed_aggregate_and_proof.message.aggregate; + crit!( + log, + "Failed to publish attestation"; + "error" => e.to_string(), + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + "type" => "aggregated", + ); + } + } } } From acde786db63fdf50ba6414d3f8eba94c944af73d Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 28 Oct 2020 17:00:06 -0400 Subject: [PATCH 08/22] update attester duties endpoint to a POST, only query for proposer duties once per epoch --- beacon_node/http_api/src/lib.rs | 224 ++++++++++++------------- beacon_node/http_api/tests/tests.rs | 22 ++- common/eth2/src/lib.rs | 69 ++++---- common/eth2/src/types.rs | 7 +- validator_client/src/duties_service.rs | 52 +++--- validator_client/src/validator_duty.rs | 105 +++++++----- 6 files changed, 246 insertions(+), 233 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index e6fb25649c0..949b7d954de 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1364,18 +1364,115 @@ pub fn serve( * validator */ - // GET validator/duties/attester/{epoch} - let get_validator_duties_attester = eth1_v1 + // GET validator/duties/proposer/{epoch} + let get_validator_duties_proposer = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("duties")) + .and(warp::path("proposer")) + .and(warp::path::param::()) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(chain_filter.clone()) + .and(beacon_proposer_cache()) + .and_then( + |epoch: Epoch, + chain: Arc>, + beacon_proposer_cache: Arc>| { + blocking_json_task(move || { + beacon_proposer_cache + .lock() + .get_proposers(&chain, epoch) + .map(api_types::GenericResponse::from) + }) + }, + ); + + // GET validator/blocks/{slot} + let get_validator_blocks = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("blocks")) + .and(warp::path::param::()) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(warp::query::()) + .and(chain_filter.clone()) + .and_then( + |slot: Slot, query: api_types::ValidatorBlocksQuery, chain: Arc>| { + blocking_json_task(move || { + let randao_reveal = (&query.randao_reveal).try_into().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "randao reveal is not valid BLS signature: {:?}", + e + )) + })?; + + chain + .produce_block(randao_reveal, slot, query.graffiti.map(Into::into)) + .map(|block_and_state| block_and_state.0) + .map(api_types::GenericResponse::from) + .map_err(warp_utils::reject::block_production_error) + }) + }, + ); + + // GET validator/attestation_data?slot,committee_index + let get_validator_attestation_data = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("attestation_data")) + .and(warp::path::end()) + .and(warp::query::()) + .and(not_while_syncing_filter.clone()) + .and(chain_filter.clone()) + .and_then( + |query: api_types::ValidatorAttestationDataQuery, chain: Arc>| { + blocking_json_task(move || { + chain + .produce_unaggregated_attestation(query.slot, query.committee_index) + .map(|attestation| attestation.data) + .map(api_types::GenericResponse::from) + .map_err(warp_utils::reject::beacon_chain_error) + }) + }, + ); + + // GET validator/aggregate_attestation?attestation_data_root,slot + let get_validator_aggregate_attestation = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("aggregate_attestation")) + .and(warp::path::end()) + .and(warp::query::()) + .and(not_while_syncing_filter.clone()) + .and(chain_filter.clone()) + .and_then( + |query: api_types::ValidatorAggregateAttestationQuery, chain: Arc>| { + blocking_json_task(move || { + chain + .get_aggregated_attestation_by_slot_and_root( + query.slot, + &query.attestation_data_root, + ) + .map(api_types::GenericResponse::from) + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "no matching aggregate found".to_string(), + ) + }) + }) + }, + ); + + // POST validator/duties/attester/{epoch} + let post_validator_duties_attester = eth1_v1 .and(warp::path("validator")) .and(warp::path("duties")) .and(warp::path("attester")) .and(warp::path::param::()) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) - .and(warp::query::()) + .and(warp::body::json()) .and(chain_filter.clone()) .and_then( - |epoch: Epoch, query: api_types::ValidatorDutiesQuery, chain: Arc>| { + |epoch: Epoch, indices: api_types::ValidatorIndexData, chain: Arc>| { blocking_json_task(move || { let current_epoch = chain .epoch() @@ -1391,30 +1488,22 @@ pub fn serve( let validator_count = StateId::head() .map_state(&chain, |state| Ok(state.validators.len() as u64))?; - let indices = query - .index - .as_ref() - .map(|index| index.0.clone()) - .map(Result::Ok) - .unwrap_or_else(|| { - Ok::<_, warp::Rejection>((0..validator_count).collect()) - })?; - let pubkeys = indices - .into_iter() - .filter(|i| *i < validator_count as u64) + .0 + .iter() + .filter(|i| **i < validator_count as u64) .map(|i| { let pubkey = chain - .validator_pubkey(i as usize) + .validator_pubkey(*i as usize) .map_err(warp_utils::reject::beacon_chain_error)? .ok_or_else(|| { warp_utils::reject::custom_bad_request(format!( "unknown validator index {}", - i + *i )) })?; - Ok((i, pubkey)) + Ok((*i, pubkey)) }) .collect::, warp::Rejection>>()?; @@ -1530,103 +1619,6 @@ pub fn serve( }, ); - // GET validator/duties/proposer/{epoch} - let get_validator_duties_proposer = eth1_v1 - .and(warp::path("validator")) - .and(warp::path("duties")) - .and(warp::path("proposer")) - .and(warp::path::param::()) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(chain_filter.clone()) - .and(beacon_proposer_cache()) - .and_then( - |epoch: Epoch, - chain: Arc>, - beacon_proposer_cache: Arc>| { - blocking_json_task(move || { - beacon_proposer_cache - .lock() - .get_proposers(&chain, epoch) - .map(api_types::GenericResponse::from) - }) - }, - ); - - // GET validator/blocks/{slot} - let get_validator_blocks = eth1_v1 - .and(warp::path("validator")) - .and(warp::path("blocks")) - .and(warp::path::param::()) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(warp::query::()) - .and(chain_filter.clone()) - .and_then( - |slot: Slot, query: api_types::ValidatorBlocksQuery, chain: Arc>| { - blocking_json_task(move || { - let randao_reveal = (&query.randao_reveal).try_into().map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "randao reveal is not valid BLS signature: {:?}", - e - )) - })?; - - chain - .produce_block(randao_reveal, slot, query.graffiti.map(Into::into)) - .map(|block_and_state| block_and_state.0) - .map(api_types::GenericResponse::from) - .map_err(warp_utils::reject::block_production_error) - }) - }, - ); - - // GET validator/attestation_data?slot,committee_index - let get_validator_attestation_data = eth1_v1 - .and(warp::path("validator")) - .and(warp::path("attestation_data")) - .and(warp::path::end()) - .and(warp::query::()) - .and(not_while_syncing_filter.clone()) - .and(chain_filter.clone()) - .and_then( - |query: api_types::ValidatorAttestationDataQuery, chain: Arc>| { - blocking_json_task(move || { - chain - .produce_unaggregated_attestation(query.slot, query.committee_index) - .map(|attestation| attestation.data) - .map(api_types::GenericResponse::from) - .map_err(warp_utils::reject::beacon_chain_error) - }) - }, - ); - - // GET validator/aggregate_attestation?attestation_data_root,slot - let get_validator_aggregate_attestation = eth1_v1 - .and(warp::path("validator")) - .and(warp::path("aggregate_attestation")) - .and(warp::path::end()) - .and(warp::query::()) - .and(not_while_syncing_filter.clone()) - .and(chain_filter.clone()) - .and_then( - |query: api_types::ValidatorAggregateAttestationQuery, chain: Arc>| { - blocking_json_task(move || { - chain - .get_aggregated_attestation_by_slot_and_root( - query.slot, - &query.attestation_data_root, - ) - .map(api_types::GenericResponse::from) - .ok_or_else(|| { - warp_utils::reject::custom_not_found( - "no matching aggregate found".to_string(), - ) - }) - }) - }, - ); - // POST validator/aggregate_and_proofs let post_validator_aggregate_and_proofs = eth1_v1 .and(warp::path("validator")) @@ -1897,7 +1889,6 @@ pub fn serve( .or(get_node_health.boxed()) .or(get_node_peers_by_id.boxed()) .or(get_node_peers.boxed()) - .or(get_validator_duties_attester.boxed()) .or(get_validator_duties_proposer.boxed()) .or(get_validator_blocks.boxed()) .or(get_validator_attestation_data.boxed()) @@ -1918,6 +1909,7 @@ pub fn serve( .or(post_beacon_pool_attester_slashings.boxed()) .or(post_beacon_pool_proposer_slashings.boxed()) .or(post_beacon_pool_voluntary_exits.boxed()) + .or(post_validator_duties_attester.boxed()) .or(post_validator_aggregate_and_proofs.boxed()) .or(post_validator_beacon_committee_subscriptions.boxed()), )) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 2ffa6b03c27..82418484ca1 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1231,7 +1231,7 @@ impl ApiTester { if epoch > current_epoch + 1 { assert_eq!( self.client - .get_validator_duties_attester(epoch, Some(&indices)) + .post_validator_duties_attester(epoch, &indices) .await .unwrap_err() .status() @@ -1243,7 +1243,7 @@ impl ApiTester { let results = self .client - .get_validator_duties_attester(epoch, Some(&indices)) + .post_validator_duties_attester(epoch, &indices) .await .unwrap() .data; @@ -1473,17 +1473,15 @@ impl ApiTester { let fork = head.beacon_state.fork; let genesis_validators_root = self.chain.genesis_validators_root; - let mut duties = vec![]; - for i in 0..self.validator_keypairs.len() { - duties.push( - self.client - .get_validator_duties_attester(epoch, Some(&[i as u64])) - .await - .unwrap() - .data[0] - .clone(), + let duties = self + .client + .post_validator_duties_attester( + epoch, + (0..self.validator_keypairs.len() as u64).collect(), ) - } + .await + .unwrap() + .data; let (i, kp, duty, proof) = self .validator_keypairs diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 1333a5bdb41..85a9c71075b 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -137,6 +137,26 @@ impl BeaconNodeHttpClient { Ok(()) } + /// Perform a HTTP POST request, returning a JSON response. + async fn post_with_response( + &self, + url: U, + body: &V, + ) -> Result { + let response = self + .client + .post(url) + .json(body) + .send() + .await + .map_err(Error::Reqwest)?; + ok_or_error(response) + .await? + .json() + .await + .map_err(Error::Reqwest) + } + /// `GET beacon/genesis` /// /// ## Errors @@ -722,37 +742,6 @@ impl BeaconNodeHttpClient { self.get(path).await } - /// `GET validator/duties/attester/{epoch}?index` - /// - /// ## Note - /// - /// The `index` query parameter accepts a list of validator indices. - pub async fn get_validator_duties_attester( - &self, - epoch: Epoch, - index: Option<&[u64]>, - ) -> Result>, Error> { - let mut path = self.eth_path()?; - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("validator") - .push("duties") - .push("attester") - .push(&epoch.to_string()); - - if let Some(index) = index { - let string = index - .iter() - .map(|i| i.to_string()) - .collect::>() - .join(","); - path.query_pairs_mut().append_pair("index", &string); - } - - self.get(path).await - } - /// `GET validator/duties/proposer/{epoch}` pub async fn get_validator_duties_proposer( &self, @@ -843,6 +832,24 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } + /// `POST validator/duties/attester/{epoch}` + pub async fn post_validator_duties_attester( + &self, + epoch: Epoch, + indices: &Vec, + ) -> Result>, Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("duties") + .push("attester") + .push(&epoch.to_string()); + + self.post_with_response(path, indices).await + } + /// `POST validator/aggregate_and_proofs` pub async fn post_validator_aggregate_and_proof( &self, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 9e7d81551a4..0fe2682fa09 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -377,10 +377,9 @@ pub struct ValidatorBalancesQuery { pub id: Option>, } -#[derive(Clone, Deserialize)] -pub struct ValidatorDutiesQuery { - pub index: Option>, -} +#[derive(Clone, Serialize, Deserialize)] +#[serde(transparent)] +pub struct ValidatorIndexData(#[serde(with = "serde_utils::quoted_u64_vec")] pub Vec); #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct AttesterData { diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 7f6d33fe85b..554ea569683 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -589,28 +589,28 @@ impl DutiesService { let mut invalid = 0; let mut validator_subscriptions = vec![]; - for pubkey in self.validator_store.voting_pubkeys() { - let remote_duties = match ValidatorDuty::download( - &self.beacon_node, - current_epoch, - request_epoch, - pubkey, - ) - .await - { - Ok(duties) => duties, - Err(e) => { - error!( - log, - "Failed to download validator duties"; - "error" => e - ); - continue; - } - }; + let remote_duties: Vec = match ValidatorDuty::download( + &self.beacon_node, + current_epoch, + request_epoch, + self.validator_store.voting_pubkeys().as_slice(), + ) + .await + { + Ok(duties) => duties, + Err(e) => { + error!( + log, + "Failed to download validator duties"; + "error" => e + ); + vec![] + } + }; + remote_duties.iter().for_each(|remote_duty| { // Convert the remote duties into our local representation. - let duties: DutyAndProof = remote_duties.clone().into(); + let duties: DutyAndProof = remote_duty.clone().into(); let validator_pubkey = duties.duty.validator_pubkey.clone(); @@ -628,9 +628,9 @@ impl DutiesService { debug!( log, "First duty assignment for validator"; - "proposal_slots" => format!("{:?}", &remote_duties.block_proposal_slots), - "attestation_slot" => format!("{:?}", &remote_duties.attestation_slot), - "validator" => format!("{:?}", &remote_duties.validator_pubkey) + "proposal_slots" => format!("{:?}", &remote_duty.block_proposal_slots), + "attestation_slot" => format!("{:?}", &remote_duty.attestation_slot), + "validator" => format!("{:?}", &remote_duty.validator_pubkey) ); new_validator += 1; } @@ -642,10 +642,10 @@ impl DutiesService { } if let Some(is_aggregator) = - self.store.is_aggregator(&validator_pubkey, request_epoch) + self.store.is_aggregator(&validator_pubkey, request_epoch) { if outcome.is_subscription_candidate() { - if let Some(subscription) = remote_duties.subscription(is_aggregator) { + if let Some(subscription) = remote_duty.subscription(is_aggregator) { validator_subscriptions.push(subscription) } } @@ -657,7 +657,7 @@ impl DutiesService { "error" => e ), } - } + }); if invalid > 0 { error!( diff --git a/validator_client/src/validator_duty.rs b/validator_client/src/validator_duty.rs index e5f56c38555..7014d7db851 100644 --- a/validator_client/src/validator_duty.rs +++ b/validator_client/src/validator_duty.rs @@ -1,8 +1,10 @@ +use eth2::types::AttesterData; use eth2::{ types::{BeaconCommitteeSubscription, StateId, ValidatorId}, BeaconNodeHttpClient, }; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use types::{CommitteeIndex, Epoch, PublicKey, PublicKeyBytes, Slot}; /// This struct is being used as a shim since we deprecated the `rest_api` in favour of `http_api`. @@ -53,58 +55,73 @@ impl ValidatorDuty { beacon_node: &BeaconNodeHttpClient, current_epoch: Epoch, request_epoch: Epoch, - pubkey: PublicKey, - ) -> Result { - let pubkey_bytes = PublicKeyBytes::from(&pubkey); + pubkeys: &[PublicKey], + ) -> Result, String> { + //TODO: Can we just store validator indices in the validator definitions.yml? + let mut duties = Vec::new(); + let mut validator_indices = Vec::new(); + let mut validator_id_tuples = Vec::new(); - let validator_index = if let Some(index) = beacon_node - .get_beacon_states_validator_id( - StateId::Head, - &ValidatorId::PublicKey(pubkey_bytes.clone()), - ) - .await - .map_err(|e| format!("Failed to get validator index: {}", e))? - .map(|body| body.data.index) - { - index - } else { - return Ok(Self::no_duties(pubkey)); - }; + for pubkey in pubkeys { + let pubkey_bytes = PublicKeyBytes::from(pubkey); + if let Some(index) = beacon_node + .get_beacon_states_validator_id( + StateId::Head, + &ValidatorId::PublicKey(pubkey_bytes.clone()), + ) + .await + .map_err(|e| format!("Failed to get validator index: {}", e))? + .map(|body| body.data.index) + { + validator_indices.push(index.clone()); + validator_id_tuples.push((index, pubkey.clone())); + } else { + duties.push(Self::no_duties(pubkey.clone())) + } + } - if let Some(attester) = beacon_node - .get_validator_duties_attester(request_epoch, Some(&[validator_index])) + let attester_data: HashMap = beacon_node + .post_validator_duties_attester(request_epoch, &validator_indices) .await .map_err(|e| format!("Failed to get attester duties: {}", e))? .data - .first() - { - let block_proposal_slots = if current_epoch == request_epoch { - beacon_node - .get_validator_duties_proposer(current_epoch) - .await - .map_err(|e| format!("Failed to get proposer indices: {}", e))? - .data - .into_iter() - .filter(|data| data.pubkey == pubkey_bytes) - .map(|data| data.slot) - .collect() - } else { - vec![] - }; + .into_iter() + .map(|data| (data.validator_index, data)) + .collect(); - Ok(ValidatorDuty { - validator_pubkey: pubkey, - validator_index: Some(attester.validator_index), - attestation_slot: Some(attester.slot), - attestation_committee_index: Some(attester.committee_index), - attestation_committee_position: Some(attester.validator_committee_index as usize), - committee_count_at_slot: Some(attester.committees_at_slot), - committee_length: Some(attester.committee_length), - block_proposal_slots: Some(block_proposal_slots), - }) + //TODO: is it possible to have one validator propose more than once per epoch? + let block_proposal_slots: HashMap> = if current_epoch == request_epoch { + beacon_node + .get_validator_duties_proposer(current_epoch) + .await + .map_err(|e| format!("Failed to get proposer indices: {}", e))? + .data + .into_iter() + .map(|data| (data.validator_index, vec![data.slot])) + .collect() } else { - Ok(Self::no_duties(pubkey)) + HashMap::new() + }; + + for validator_id_tuple in validator_id_tuples { + if let Some(attester) = attester_data.get(&validator_id_tuple.0) { + duties.push(ValidatorDuty { + validator_pubkey: validator_id_tuple.1, + validator_index: Some(attester.validator_index), + attestation_slot: Some(attester.slot), + attestation_committee_index: Some(attester.committee_index), + attestation_committee_position: Some( + attester.validator_committee_index as usize, + ), + committee_count_at_slot: Some(attester.committees_at_slot), + committee_length: Some(attester.committee_length), + block_proposal_slots: block_proposal_slots.get(&validator_id_tuple.0).cloned(), + }); + } else { + duties.push(Self::no_duties(validator_id_tuple.1)) + } } + Ok(duties) } /// Return `true` if these validator duties are equal, ignoring their `block_proposal_slots`. From 80605388775e6e1502b8213467e0a421d2955333 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 28 Oct 2020 17:16:45 -0400 Subject: [PATCH 09/22] cargo clippy --- beacon_node/http_api/tests/tests.rs | 8 ++++---- common/eth2/src/lib.rs | 8 ++++---- validator_client/src/attestation_service.rs | 2 +- validator_client/src/validator_duty.rs | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 82418484ca1..c58714828d7 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1231,7 +1231,7 @@ impl ApiTester { if epoch > current_epoch + 1 { assert_eq!( self.client - .post_validator_duties_attester(epoch, &indices) + .post_validator_duties_attester(epoch, indices) .await .unwrap_err() .status() @@ -1243,7 +1243,7 @@ impl ApiTester { let results = self .client - .post_validator_duties_attester(epoch, &indices) + .post_validator_duties_attester(epoch, indices) .await .unwrap() .data; @@ -1552,7 +1552,7 @@ impl ApiTester { let aggregate = self.get_aggregate().await; self.client - .post_validator_aggregate_and_proof::(&vec![aggregate]) + .post_validator_aggregate_and_proof::(vec![aggregate]) .await .unwrap(); @@ -1567,7 +1567,7 @@ impl ApiTester { aggregate.message.aggregate.data.slot += 1; self.client - .post_validator_aggregate_and_proof::(&vec![aggregate]) + .post_validator_aggregate_and_proof::(vec![aggregate]) .await .unwrap_err(); diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 85a9c71075b..2b9604cfdba 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -836,7 +836,7 @@ impl BeaconNodeHttpClient { pub async fn post_validator_duties_attester( &self, epoch: Epoch, - indices: &Vec, + indices: Vec, ) -> Result>, Error> { let mut path = self.eth_path()?; @@ -847,13 +847,13 @@ impl BeaconNodeHttpClient { .push("attester") .push(&epoch.to_string()); - self.post_with_response(path, indices).await + self.post_with_response(path, &indices).await } /// `POST validator/aggregate_and_proofs` pub async fn post_validator_aggregate_and_proof( &self, - aggregates: &Vec>, + aggregates: Vec>, ) -> Result<(), Error> { let mut path = self.eth_path()?; @@ -862,7 +862,7 @@ impl BeaconNodeHttpClient { .push("validator") .push("aggregate_and_proofs"); - self.post(path, aggregates).await?; + self.post(path, &aggregates).await?; Ok(()) } diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 60c5d18d579..fbc80959db6 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -480,7 +480,7 @@ impl AttestationService { if !signed_aggregate_and_proofs.is_empty() { match self .beacon_node - .post_validator_aggregate_and_proof(&signed_aggregate_and_proofs) + .post_validator_aggregate_and_proof(signed_aggregate_and_proofs.clone()) .await { Ok(()) => { diff --git a/validator_client/src/validator_duty.rs b/validator_client/src/validator_duty.rs index 7014d7db851..19c3118a844 100644 --- a/validator_client/src/validator_duty.rs +++ b/validator_client/src/validator_duty.rs @@ -73,7 +73,7 @@ impl ValidatorDuty { .map_err(|e| format!("Failed to get validator index: {}", e))? .map(|body| body.data.index) { - validator_indices.push(index.clone()); + validator_indices.push(index); validator_id_tuples.push((index, pubkey.clone())); } else { duties.push(Self::no_duties(pubkey.clone())) @@ -81,7 +81,7 @@ impl ValidatorDuty { } let attester_data: HashMap = beacon_node - .post_validator_duties_attester(request_epoch, &validator_indices) + .post_validator_duties_attester(request_epoch, validator_indices) .await .map_err(|e| format!("Failed to get attester duties: {}", e))? .data From c7348341c1def45789c016baa9ab5fe061781eec Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 28 Oct 2020 17:26:01 -0400 Subject: [PATCH 10/22] cargo clippy --- beacon_node/http_api/tests/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index c58714828d7..ab33cfdb9ec 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1243,7 +1243,7 @@ impl ApiTester { let results = self .client - .post_validator_duties_attester(epoch, indices) + .post_validator_duties_attester(epoch, indices.clone()) .await .unwrap() .data; From 5bba1d854adc157fce222a4e6957372238dd1c41 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Thu, 29 Oct 2020 11:09:54 -0400 Subject: [PATCH 11/22] check for known validator indices before querying the beacon node --- validator_client/src/duties_service.rs | 33 +++++++++++++- validator_client/src/validator_duty.rs | 60 ++++++++++++++------------ 2 files changed, 65 insertions(+), 28 deletions(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 554ea569683..f6d58c4e21b 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -106,6 +106,10 @@ impl DutyAndProof { pub fn validator_pubkey(&self) -> &PublicKey { &self.duty.validator_pubkey } + + pub fn validator_index(&self) -> Option { + self.duty.validator_index + } } impl Into for ValidatorDuty { @@ -229,6 +233,14 @@ impl DutiesStore { .collect() } + fn get_index(&self, pubkey: &PublicKey, epoch: Epoch) -> Option { + self.store + .read() + .get(pubkey)? + .get(&epoch)? + .validator_index() + } + fn is_aggregator(&self, validator_pubkey: &PublicKey, epoch: Epoch) -> Option { Some( self.store @@ -588,12 +600,31 @@ impl DutiesService { let mut replaced = 0; let mut invalid = 0; + // Determine which pubkeys we already know the index of by checking the duties store for + // the current epoch. + let mut unknown_pubkeys = Vec::new(); + let known_pubkeys = self + .validator_store + .voting_pubkeys() + .into_iter() + .filter_map( + |pubkey| match self.store.get_index(&pubkey, current_epoch) { + Some(index) => Some((pubkey, index)), + None => { + unknown_pubkeys.push(pubkey); + None + } + }, + ) + .collect(); + let mut validator_subscriptions = vec![]; let remote_duties: Vec = match ValidatorDuty::download( &self.beacon_node, current_epoch, request_epoch, - self.validator_store.voting_pubkeys().as_slice(), + known_pubkeys, + unknown_pubkeys.as_slice(), ) .await { diff --git a/validator_client/src/validator_duty.rs b/validator_client/src/validator_duty.rs index 19c3118a844..e90eb8ef076 100644 --- a/validator_client/src/validator_duty.rs +++ b/validator_client/src/validator_duty.rs @@ -35,10 +35,10 @@ pub struct ValidatorDuty { impl ValidatorDuty { /// Instantiate `Self` as if there are no known dutes for `validator_pubkey`. - fn no_duties(validator_pubkey: PublicKey) -> Self { + fn no_duties(validator_pubkey: PublicKey, validator_index: Option) -> Self { ValidatorDuty { validator_pubkey, - validator_index: None, + validator_index, attestation_slot: None, attestation_committee_index: None, attestation_committee_position: None, @@ -55,14 +55,13 @@ impl ValidatorDuty { beacon_node: &BeaconNodeHttpClient, current_epoch: Epoch, request_epoch: Epoch, - pubkeys: &[PublicKey], + mut known_pubkeys: Vec<(PublicKey, u64)>, + unknown_pubkeys: &[PublicKey], ) -> Result, String> { - //TODO: Can we just store validator indices in the validator definitions.yml? let mut duties = Vec::new(); - let mut validator_indices = Vec::new(); - let mut validator_id_tuples = Vec::new(); - for pubkey in pubkeys { + // Query for any pubkeys we don't know the index for in the current epoch. + for pubkey in unknown_pubkeys { let pubkey_bytes = PublicKeyBytes::from(pubkey); if let Some(index) = beacon_node .get_beacon_states_validator_id( @@ -73,15 +72,18 @@ impl ValidatorDuty { .map_err(|e| format!("Failed to get validator index: {}", e))? .map(|body| body.data.index) { - validator_indices.push(index); - validator_id_tuples.push((index, pubkey.clone())); + known_pubkeys.push((pubkey.clone(), index)); } else { - duties.push(Self::no_duties(pubkey.clone())) + duties.push(Self::no_duties(pubkey.clone(), None)) } } - let attester_data: HashMap = beacon_node - .post_validator_duties_attester(request_epoch, validator_indices) + // Query attester duties for known indices, and map the response by index. + let attester_data_by_index: HashMap = beacon_node + .post_validator_duties_attester( + request_epoch, + known_pubkeys.iter().map(|(_, index)| *index).collect(), + ) .await .map_err(|e| format!("Failed to get attester duties: {}", e))? .data @@ -89,36 +91,40 @@ impl ValidatorDuty { .map(|data| (data.validator_index, data)) .collect(); - //TODO: is it possible to have one validator propose more than once per epoch? - let block_proposal_slots: HashMap> = if current_epoch == request_epoch { + // Query for all block proposer duties in the current epoch and map the response by index. + let proposal_slots_by_index: HashMap> = if current_epoch == request_epoch { beacon_node .get_validator_duties_proposer(current_epoch) .await .map_err(|e| format!("Failed to get proposer indices: {}", e))? .data .into_iter() - .map(|data| (data.validator_index, vec![data.slot])) - .collect() + .fold(HashMap::new(), |mut map, proposer_data| { + map.entry(proposer_data.validator_index) + .or_insert_with(Vec::new) + .push(proposer_data.slot); + map + }) } else { HashMap::new() }; - for validator_id_tuple in validator_id_tuples { - if let Some(attester) = attester_data.get(&validator_id_tuple.0) { + for (pubkey, index) in known_pubkeys { + if let Some(attester_data) = attester_data_by_index.get(&index) { duties.push(ValidatorDuty { - validator_pubkey: validator_id_tuple.1, - validator_index: Some(attester.validator_index), - attestation_slot: Some(attester.slot), - attestation_committee_index: Some(attester.committee_index), + validator_pubkey: pubkey, + validator_index: Some(attester_data.validator_index), + attestation_slot: Some(attester_data.slot), + attestation_committee_index: Some(attester_data.committee_index), attestation_committee_position: Some( - attester.validator_committee_index as usize, + attester_data.validator_committee_index as usize, ), - committee_count_at_slot: Some(attester.committees_at_slot), - committee_length: Some(attester.committee_length), - block_proposal_slots: block_proposal_slots.get(&validator_id_tuple.0).cloned(), + committee_count_at_slot: Some(attester_data.committees_at_slot), + committee_length: Some(attester_data.committee_length), + block_proposal_slots: proposal_slots_by_index.get(&index).cloned(), }); } else { - duties.push(Self::no_duties(validator_id_tuple.1)) + duties.push(Self::no_duties(pubkey, Some(index))) } } Ok(duties) From d31ee8786492d64931b1475955e181af324abf06 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Thu, 29 Oct 2020 11:12:30 -0400 Subject: [PATCH 12/22] revert change to `StateId`'s `FromStr` --- beacon_node/http_api/src/state_id.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index 7a9a3f34df9..11800648f25 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -110,11 +110,9 @@ impl StateId { } impl FromStr for StateId { - type Err = warp::Rejection; + type Err = String; fn from_str(s: &str) -> Result { - CoreStateId::from_str(s) - .map(Self) - .map_err(|_| warp_utils::reject::custom_bad_request(format!("Invalid state ID: {}", s))) + CoreStateId::from_str(s).map(Self) } } From 493361cc02e862ac6109853dbbc6acb1347449bb Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 30 Oct 2020 14:32:16 -0400 Subject: [PATCH 13/22] add historical lookups for proposer duties --- beacon_node/http_api/src/lib.rs | 51 +++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 47510c07718..4c5483ad445 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -39,7 +39,7 @@ use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use types::{ Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec, - Hash256, ProposerSlashing, PublicKey, RelativeEpoch, SignedAggregateAndProof, + Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, YamlConfig, }; use warp::{http::Response, Filter}; @@ -1408,10 +1408,51 @@ pub fn serve( chain: Arc>, beacon_proposer_cache: Arc>| { blocking_json_task(move || { - beacon_proposer_cache - .lock() - .get_proposers(&chain, epoch) - .map(api_types::GenericResponse::from) + + let current_epoch = chain + .epoch() + .map_err(warp_utils::reject::beacon_chain_error)?; + + if epoch > current_epoch { + return Err(warp_utils::reject::custom_bad_request(format!( + "request epoch {} is ahead of the current epoch {}", + epoch, current_epoch + ))); + } + + // The idea is to stop historical requests from washing out the cache on the + // beacon chain, whilst allowing a VC to request duties quickly. + if epoch == current_epoch { + beacon_proposer_cache + .lock() + .get_proposers(&chain, epoch) + .map(api_types::GenericResponse::from) + } else { + let state = + StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch())) + .state(&chain)?; + + epoch + .slot_iter(T::EthSpec::slots_per_epoch()) + .map(|slot| { + state + .get_beacon_proposer_index(slot, &chain.spec) + .map_err(warp_utils::reject::beacon_state_error) + .and_then(|i| { + let pubkey = + chain.validator_pubkey(i).map_err(warp_utils::reject::beacon_chain_error)? + .ok_or_else(|| warp_utils::reject::beacon_chain_error(BeaconChainError::ValidatorPubkeyCacheIncomplete(i)) )?; + + Ok(api_types::ProposerData { + pubkey: PublicKeyBytes::from(pubkey), + validator_index: i as u64, + slot, + }) + }) + }) + .collect::, _>>() + .and_then(|proposer_data|Ok(api_types::GenericResponse::from(proposer_data))) + } }) }, ); From ef254844936cedf09176f72421ae738001459972 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 2 Nov 2020 21:22:34 -0500 Subject: [PATCH 14/22] remove comment --- beacon_node/http_api/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 64e520332df..62a97f4926e 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1434,8 +1434,6 @@ pub fn serve( ))); } - // The idea is to stop historical requests from washing out the cache on the - // beacon chain, whilst allowing a VC to request duties quickly. if epoch == current_epoch { beacon_proposer_cache .lock() From 59ef92362480cbaeb4abe21926ac0d25087407ff Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 2 Nov 2020 21:39:30 -0500 Subject: [PATCH 15/22] appease clippy --- beacon_node/http_api/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 62a97f4926e..722c1cc6085 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1463,7 +1463,7 @@ pub fn serve( }) }) .collect::, _>>() - .and_then(|proposer_data|Ok(api_types::GenericResponse::from(proposer_data))) + .map(api_types::GenericResponse::from) } }) }, From 81d3ffc9641c63583280e71f355f8e5344e7c3ea Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 6 Nov 2020 13:02:47 -0500 Subject: [PATCH 16/22] aggregate and proofs endpoint error handling update --- beacon_node/http_api/src/lib.rs | 98 +++++++++++++++++++-------------- common/eth2/src/types.rs | 25 +++++++++ common/warp_utils/src/reject.rs | 27 ++++++++- 3 files changed, 109 insertions(+), 41 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 722c1cc6085..afc29937fcd 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1422,7 +1422,6 @@ pub fn serve( chain: Arc>, beacon_proposer_cache: Arc>| { blocking_json_task(move || { - let current_epoch = chain .epoch() .map_err(warp_utils::reject::beacon_chain_error)?; @@ -1452,8 +1451,13 @@ pub fn serve( .map_err(warp_utils::reject::beacon_state_error) .and_then(|i| { let pubkey = - chain.validator_pubkey(i).map_err(warp_utils::reject::beacon_chain_error)? - .ok_or_else(|| warp_utils::reject::beacon_chain_error(BeaconChainError::ValidatorPubkeyCacheIncomplete(i)) )?; + chain.validator_pubkey(i) + .map_err(warp_utils::reject::beacon_chain_error)? + .ok_or_else(|| + warp_utils::reject::beacon_chain_error( + BeaconChainError::ValidatorPubkeyCacheIncomplete(i) + ) + )?; Ok(api_types::ProposerData { pubkey: PublicKeyBytes::from(pubkey), @@ -1710,17 +1714,25 @@ pub fn serve( .and(chain_filter.clone()) .and(warp::body::json()) .and(network_tx_filter.clone()) + .and(log_filter.clone()) .and_then( |chain: Arc>, aggregates: Vec>, - network_tx: UnboundedSender>| { + network_tx: UnboundedSender>, log: Logger| { blocking_json_task(move || { - let mut verified_aggregates = Vec::new(); + let mut verified_aggregates = Vec::with_capacity(aggregates.len()); + let mut messages = Vec::with_capacity(aggregates.len()); + let mut failures = Vec::new(); // Verify that all messages in the post are valid before processing further - for aggregate in aggregates.as_slice() { + for (index, aggregate) in aggregates.as_slice().iter().enumerate() { match chain.verify_aggregated_attestation_for_gossip(aggregate.clone()) { - Ok(verified_aggregate) => verified_aggregates.push(verified_aggregate), + Ok(verified_aggregate) => { + messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new( + verified_aggregate.aggregate().clone(), + ))); + verified_aggregates.push((index, verified_aggregate)); + } // If we already know the attestation, don't broadcast it or attempt to // further verify it. Return success. // @@ -1729,48 +1741,54 @@ pub fn serve( // node. Err(AttnError::AttestationAlreadyKnown(_)) => continue, Err(e) => { - return Err(warp_utils::reject::object_invalid(format!( - "gossip verification failed: {:?}", - e - ))); - } + error!(log, + "failure verifying aggregate and proofs"; + "error" => format!("{:?}", e), + "request_index" => index, + "aggregator_index" => aggregate.message.aggregator_index, + "attestation_index" => aggregate.message.aggregate.data.index, + "attestation_slot" => aggregate.message.aggregate.data.slot, + ); + failures.push(api_types::Failure::new(index, format!("{:?}", e))); + }, } } - let messages: Vec> = verified_aggregates - .iter() - .map(|verified_aggregate| { - PubsubMessage::AggregateAndProofAttestation(Box::new( - verified_aggregate.aggregate().clone(), - )) - }) - .collect(); - + // Publish aggregate attestations to the libp2p network if !messages.is_empty() { publish_network_message(&network_tx, NetworkMessage::Publish { messages })?; } - for verified_aggregate in verified_aggregates { - chain - .apply_attestation_to_fork_choice(&verified_aggregate) - .map_err(|e| { - warp_utils::reject::broadcast_without_import(format!( - "not applied to fork choice: {:?}", - e - )) - })?; - - chain - .add_to_block_inclusion_pool(verified_aggregate) - .map_err(|e| { - warp_utils::reject::broadcast_without_import(format!( - "not applied to block inclusion pool: {:?}", - e - )) - })?; + // Import aggregate attestations + for (index, verified_aggregate) in verified_aggregates { + if let Err(e) = chain.apply_attestation_to_fork_choice(&verified_aggregate) { + error!(log, + "failure applying verified aggregate attestation to fork choice"; + "error" => format!("{:?}", e), + "request_index" => index, + "aggregator_index" => verified_aggregate.aggregate().message.aggregator_index, + "attestation_index" => verified_aggregate.attestation().data.index, + "attestation_slot" => verified_aggregate.attestation().data.slot, + ); + failures.push(api_types::Failure::new(index, format!("{:?}", e))); + } + if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) { + warn!(log, + "could not add verified aggregate attestation to the inclusion pool"; + "error" => format!("{:?}", e), + "request_index" => index, + ); + failures.push(api_types::Failure::new(index, format!("{:?}", e))); + } } - Ok(()) + if !failures.is_empty() { + Err(warp_utils::reject::indexed_bad_request("error processing aggregate and proofs".to_string(), + failures + )) + } else { + Ok(()) + } }) }, ); diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index d47cc45f4b2..b5ef41cd1d1 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -18,6 +18,31 @@ pub struct ErrorMessage { pub stacktraces: Vec, } +/// An API error serializable to JSON. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct IndexedErrorMessage { + pub code: u16, + pub message: String, + #[serde(default)] + pub failures: Vec, +} + +/// An API error serializable to JSON. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Failure { + pub index: u64, + pub message: String, +} + +impl Failure { + pub fn new(index: usize, message: String) -> Self { + Self { + index: index as u64, + message, + } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct GenesisData { #[serde(with = "serde_utils::quoted_u64")] diff --git a/common/warp_utils/src/reject.rs b/common/warp_utils/src/reject.rs index 020fa19d8b8..874333a8da8 100644 --- a/common/warp_utils/src/reject.rs +++ b/common/warp_utils/src/reject.rs @@ -1,4 +1,4 @@ -use eth2::types::ErrorMessage; +use eth2::types::{ErrorMessage, Failure, IndexedErrorMessage}; use std::convert::Infallible; use warp::{http::StatusCode, reject::Reject}; @@ -110,12 +110,37 @@ pub fn invalid_auth(msg: String) -> warp::reject::Rejection { warp::reject::custom(InvalidAuthorization(msg)) } +#[derive(Debug)] +pub struct IndexedBadRequestErrors { + pub message: String, + pub failures: Vec, +} + +impl Reject for IndexedBadRequestErrors {} + +pub fn indexed_bad_request(message: String, failures: Vec) -> warp::reject::Rejection { + warp::reject::custom(IndexedBadRequestErrors { message, failures }) +} + /// This function receives a `Rejection` and tries to return a custom /// value, otherwise simply passes the rejection along. pub async fn handle_rejection(err: warp::Rejection) -> Result { let code; let message; + if let Some(e) = err.find::() { + message = format!("BAD_REQUEST: error: {}", e.message); + code = StatusCode::BAD_REQUEST; + + let json = warp::reply::json(&IndexedErrorMessage { + code: code.as_u16(), + message, + failures: e.failures.clone(), + }); + + return Ok(warp::reply::with_status(json, code)); + } + if err.is_not_found() { code = StatusCode::NOT_FOUND; message = "NOT_FOUND".to_string(); From e4a6f9b5f5c23ddc52b1bf10b79ecaab83850ddb Mon Sep 17 00:00:00 2001 From: realbigsean Date: Sat, 7 Nov 2020 15:33:09 -0500 Subject: [PATCH 17/22] validator duties download refactor --- beacon_node/http_api/tests/tests.rs | 12 +- common/eth2/src/lib.rs | 4 +- validator_client/src/attestation_service.rs | 2 +- validator_client/src/duties_service.rs | 20 ++-- validator_client/src/validator_duty.rs | 115 ++++++++++++-------- 5 files changed, 85 insertions(+), 68 deletions(-) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index d71614c8e77..0ed72a3a51f 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1306,7 +1306,7 @@ impl ApiTester { if epoch > current_epoch + 1 { assert_eq!( self.client - .post_validator_duties_attester(epoch, indices) + .post_validator_duties_attester(epoch, indices.as_slice()) .await .unwrap_err() .status() @@ -1318,7 +1318,7 @@ impl ApiTester { let results = self .client - .post_validator_duties_attester(epoch, indices.clone()) + .post_validator_duties_attester(epoch, indices.as_slice()) .await .unwrap() .data; @@ -1552,7 +1552,9 @@ impl ApiTester { .client .post_validator_duties_attester( epoch, - (0..self.validator_keypairs.len() as u64).collect(), + (0..self.validator_keypairs.len() as u64) + .collect() + .as_slice(), ) .await .unwrap() @@ -1627,7 +1629,7 @@ impl ApiTester { let aggregate = self.get_aggregate().await; self.client - .post_validator_aggregate_and_proof::(vec![aggregate]) + .post_validator_aggregate_and_proof::(vec![aggregate].as_slice()) .await .unwrap(); @@ -1642,7 +1644,7 @@ impl ApiTester { aggregate.message.aggregate.data.slot += 1; self.client - .post_validator_aggregate_and_proof::(vec![aggregate]) + .post_validator_aggregate_and_proof::(vec![aggregate].as_slice()) .await .unwrap_err(); diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index fff985ae23b..9094268c78d 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -856,7 +856,7 @@ impl BeaconNodeHttpClient { pub async fn post_validator_duties_attester( &self, epoch: Epoch, - indices: Vec, + indices: &[u64], ) -> Result>, Error> { let mut path = self.eth_path()?; @@ -873,7 +873,7 @@ impl BeaconNodeHttpClient { /// `POST validator/aggregate_and_proofs` pub async fn post_validator_aggregate_and_proof( &self, - aggregates: Vec>, + aggregates: &[SignedAggregateAndProof], ) -> Result<(), Error> { let mut path = self.eth_path()?; diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index fbc80959db6..f4f4eb636ee 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -480,7 +480,7 @@ impl AttestationService { if !signed_aggregate_and_proofs.is_empty() { match self .beacon_node - .post_validator_aggregate_and_proof(signed_aggregate_and_proofs.clone()) + .post_validator_aggregate_and_proof(signed_aggregate_and_proofs.as_slice()) .await { Ok(()) => { diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index f6d58c4e21b..2854571f634 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -602,20 +602,14 @@ impl DutiesService { // Determine which pubkeys we already know the index of by checking the duties store for // the current epoch. - let mut unknown_pubkeys = Vec::new(); - let known_pubkeys = self + let pubkeys: Vec<(PublicKey, Option)> = self .validator_store .voting_pubkeys() .into_iter() - .filter_map( - |pubkey| match self.store.get_index(&pubkey, current_epoch) { - Some(index) => Some((pubkey, index)), - None => { - unknown_pubkeys.push(pubkey); - None - } - }, - ) + .map(|pubkey| { + let index = self.store.get_index(&pubkey, current_epoch); + (pubkey, index) + }) .collect(); let mut validator_subscriptions = vec![]; @@ -623,8 +617,8 @@ impl DutiesService { &self.beacon_node, current_epoch, request_epoch, - known_pubkeys, - unknown_pubkeys.as_slice(), + pubkeys.as_slice(), + &log, ) .await { diff --git a/validator_client/src/validator_duty.rs b/validator_client/src/validator_duty.rs index e90eb8ef076..7d08b2c6d63 100644 --- a/validator_client/src/validator_duty.rs +++ b/validator_client/src/validator_duty.rs @@ -1,9 +1,9 @@ -use eth2::types::AttesterData; use eth2::{ types::{BeaconCommitteeSubscription, StateId, ValidatorId}, BeaconNodeHttpClient, }; use serde::{Deserialize, Serialize}; +use slog::{error, Logger}; use std::collections::HashMap; use types::{CommitteeIndex, Epoch, PublicKey, PublicKeyBytes, Slot}; @@ -55,42 +55,39 @@ impl ValidatorDuty { beacon_node: &BeaconNodeHttpClient, current_epoch: Epoch, request_epoch: Epoch, - mut known_pubkeys: Vec<(PublicKey, u64)>, - unknown_pubkeys: &[PublicKey], + pubkeys: &[(PublicKey, Option)], + log: &Logger, ) -> Result, String> { - let mut duties = Vec::new(); + let mut duties = Vec::with_capacity(pubkeys.len()); + let mut query_indices = Vec::with_capacity(pubkeys.len()); + let mut query_pubkeys = Vec::with_capacity(pubkeys.len()); - // Query for any pubkeys we don't know the index for in the current epoch. - for pubkey in unknown_pubkeys { - let pubkey_bytes = PublicKeyBytes::from(pubkey); - if let Some(index) = beacon_node - .get_beacon_states_validator_id( - StateId::Head, - &ValidatorId::PublicKey(pubkey_bytes.clone()), - ) - .await - .map_err(|e| format!("Failed to get validator index: {}", e))? - .map(|body| body.data.index) - { - known_pubkeys.push((pubkey.clone(), index)); + for (pubkey, index_opt) in pubkeys { + if let Some(index) = index_opt { + // If we know the index already, include it in the duties query + query_indices.push(*index); + query_pubkeys.push(pubkey.clone()); } else { - duties.push(Self::no_duties(pubkey.clone(), None)) + // Query for any pubkeys we don't know the index for in the current epoch. + let pubkey_bytes = PublicKeyBytes::from(pubkey); + if let Some(index) = beacon_node + .get_beacon_states_validator_id( + StateId::Head, + &ValidatorId::PublicKey(pubkey_bytes.clone()), + ) + .await + .map_err(|e| format!("Failed to get validator index: {}", e))? + .map(|body| body.data.index) + { + query_indices.push(index); + query_pubkeys.push(pubkey.clone()); + } else { + // If we still don't know the index, add an empty duty + duties.push(Self::no_duties(pubkey.clone(), None)); + } } } - // Query attester duties for known indices, and map the response by index. - let attester_data_by_index: HashMap = beacon_node - .post_validator_duties_attester( - request_epoch, - known_pubkeys.iter().map(|(_, index)| *index).collect(), - ) - .await - .map_err(|e| format!("Failed to get attester duties: {}", e))? - .data - .into_iter() - .map(|data| (data.validator_index, data)) - .collect(); - // Query for all block proposer duties in the current epoch and map the response by index. let proposal_slots_by_index: HashMap> = if current_epoch == request_epoch { beacon_node @@ -109,22 +106,46 @@ impl ValidatorDuty { HashMap::new() }; - for (pubkey, index) in known_pubkeys { - if let Some(attester_data) = attester_data_by_index.get(&index) { - duties.push(ValidatorDuty { - validator_pubkey: pubkey, - validator_index: Some(attester_data.validator_index), - attestation_slot: Some(attester_data.slot), - attestation_committee_index: Some(attester_data.committee_index), - attestation_committee_position: Some( - attester_data.validator_committee_index as usize, - ), - committee_count_at_slot: Some(attester_data.committees_at_slot), - committee_length: Some(attester_data.committee_length), - block_proposal_slots: proposal_slots_by_index.get(&index).cloned(), - }); - } else { - duties.push(Self::no_duties(pubkey, Some(index))) + // Query attester duties for known indices, add duties to our duty `Vec` and map the result + // to validator indices. We track indices so we can determine which validator indices from + // our query yielded results. + let returned_indices: Vec = beacon_node + .post_validator_duties_attester( + request_epoch, + query_indices.as_slice(), + ) + .await + .map_err(|e| format!("Failed to get attester duties: {}", e))? + .data + .into_iter() + .filter_map(|attester_data| { + match attester_data.pubkey.decompress() { + Ok(pubkey) => { + duties.push(ValidatorDuty { + validator_pubkey: pubkey, + validator_index: Some(attester_data.validator_index), + attestation_slot: Some(attester_data.slot), + attestation_committee_index: Some(attester_data.committee_index), + attestation_committee_position: Some( + attester_data.validator_committee_index as usize, + ), + committee_count_at_slot: Some(attester_data.committees_at_slot), + committee_length: Some(attester_data.committee_length), + block_proposal_slots: proposal_slots_by_index.get(&attester_data.validator_index).cloned(), + }); + Some(attester_data.validator_index) + } + Err(e) => { + error!(log, "Could not deserialize validator public key"; "error" => format!("{:?}", e), "validator_index" => attester_data.validator_index); + None + } + } + }).collect(); + + // Compare queried validators with results, and add empty duties where necessary. + for (index, pubkey) in query_indices.into_iter().zip(query_pubkeys.into_iter()) { + if !returned_indices.contains(&index) { + duties.push(Self::no_duties(pubkey, Some(index))); } } Ok(duties) From 016c105c5c20f6bc207cb96b03a05c12010433d0 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Sat, 7 Nov 2020 16:06:24 -0500 Subject: [PATCH 18/22] fix test compile error --- beacon_node/http_api/tests/tests.rs | 2 +- validator_client/src/validator_duty.rs | 55 ++++++++++++++------------ 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 0ed72a3a51f..742bdecaa50 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1553,7 +1553,7 @@ impl ApiTester { .post_validator_duties_attester( epoch, (0..self.validator_keypairs.len() as u64) - .collect() + .collect::>() .as_slice(), ) .await diff --git a/validator_client/src/validator_duty.rs b/validator_client/src/validator_duty.rs index 7d08b2c6d63..7f68df82a57 100644 --- a/validator_client/src/validator_duty.rs +++ b/validator_client/src/validator_duty.rs @@ -110,37 +110,40 @@ impl ValidatorDuty { // to validator indices. We track indices so we can determine which validator indices from // our query yielded results. let returned_indices: Vec = beacon_node - .post_validator_duties_attester( - request_epoch, - query_indices.as_slice(), - ) + .post_validator_duties_attester(request_epoch, query_indices.as_slice()) .await .map_err(|e| format!("Failed to get attester duties: {}", e))? .data .into_iter() - .filter_map(|attester_data| { - match attester_data.pubkey.decompress() { - Ok(pubkey) => { - duties.push(ValidatorDuty { - validator_pubkey: pubkey, - validator_index: Some(attester_data.validator_index), - attestation_slot: Some(attester_data.slot), - attestation_committee_index: Some(attester_data.committee_index), - attestation_committee_position: Some( - attester_data.validator_committee_index as usize, - ), - committee_count_at_slot: Some(attester_data.committees_at_slot), - committee_length: Some(attester_data.committee_length), - block_proposal_slots: proposal_slots_by_index.get(&attester_data.validator_index).cloned(), - }); - Some(attester_data.validator_index) - } - Err(e) => { - error!(log, "Could not deserialize validator public key"; "error" => format!("{:?}", e), "validator_index" => attester_data.validator_index); - None - } + .filter_map(|attester_data| match attester_data.pubkey.decompress() { + Ok(pubkey) => { + duties.push(ValidatorDuty { + validator_pubkey: pubkey, + validator_index: Some(attester_data.validator_index), + attestation_slot: Some(attester_data.slot), + attestation_committee_index: Some(attester_data.committee_index), + attestation_committee_position: Some( + attester_data.validator_committee_index as usize, + ), + committee_count_at_slot: Some(attester_data.committees_at_slot), + committee_length: Some(attester_data.committee_length), + block_proposal_slots: proposal_slots_by_index + .get(&attester_data.validator_index) + .cloned(), + }); + Some(attester_data.validator_index) } - }).collect(); + Err(e) => { + error!( + log, + "Could not deserialize validator public key"; + "error" => format!("{:?}", e), + "validator_index" => attester_data.validator_index + ); + None + } + }) + .collect(); // Compare queried validators with results, and add empty duties where necessary. for (index, pubkey) in query_indices.into_iter().zip(query_pubkeys.into_iter()) { From 9cacac5ebf77a2dc929a05374ddb26828c402b7b Mon Sep 17 00:00:00 2001 From: realbigsean Date: Sat, 7 Nov 2020 19:30:52 -0500 Subject: [PATCH 19/22] fix indexed error deserialization --- beacon_node/http_api/tests/tests.rs | 4 ++-- common/eth2/src/lib.rs | 26 +++++++++++++++++++++++++- common/eth2/src/types.rs | 5 ++--- common/warp_utils/src/reject.rs | 2 +- 4 files changed, 30 insertions(+), 7 deletions(-) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 742bdecaa50..54b9c5e7233 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1629,7 +1629,7 @@ impl ApiTester { let aggregate = self.get_aggregate().await; self.client - .post_validator_aggregate_and_proof::(vec![aggregate].as_slice()) + .post_validator_aggregate_and_proof::(&[aggregate]) .await .unwrap(); @@ -1644,7 +1644,7 @@ impl ApiTester { aggregate.message.aggregate.data.slot += 1; self.client - .post_validator_aggregate_and_proof::(vec![aggregate].as_slice()) + .post_validator_aggregate_and_proof::(&[aggregate]) .await .unwrap_err(); diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 9094268c78d..81b55dab19b 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -28,6 +28,8 @@ pub enum Error { Reqwest(reqwest::Error), /// The server returned an error message where the body was able to be parsed. ServerMessage(ErrorMessage), + /// The server returned an error message with an array of errors. + ServerIndexedMessage(IndexedErrorMessage), /// The server returned an error message where the body was unable to be parsed. StatusCode(StatusCode), /// The supplied URL is badly formatted. It should look something like `http://127.0.0.1:5052`. @@ -50,6 +52,7 @@ impl Error { match self { Error::Reqwest(error) => error.status(), Error::ServerMessage(msg) => StatusCode::try_from(msg.code).ok(), + Error::ServerIndexedMessage(msg) => StatusCode::try_from(msg.code).ok(), Error::StatusCode(status) => Some(*status), Error::InvalidUrl(_) => None, Error::InvalidSecret(_) => None, @@ -882,7 +885,14 @@ impl BeaconNodeHttpClient { .push("validator") .push("aggregate_and_proofs"); - self.post(path, &aggregates).await?; + let response = self + .client + .post(path) + .json(aggregates) + .send() + .await + .map_err(Error::Reqwest)?; + ok_or_indexed_error(response).await?; Ok(()) } @@ -918,3 +928,17 @@ async fn ok_or_error(response: Response) -> Result { Err(Error::StatusCode(status)) } } + +/// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an +/// appropriate indexed error message. +async fn ok_or_indexed_error(response: Response) -> Result { + let status = response.status(); + + if status == StatusCode::OK { + Ok(response) + } else if let Ok(message) = response.json().await { + Err(Error::ServerIndexedMessage(message)) + } else { + Err(Error::StatusCode(status)) + } +} diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index b5ef41cd1d1..066dfc170f8 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -18,16 +18,15 @@ pub struct ErrorMessage { pub stacktraces: Vec, } -/// An API error serializable to JSON. +/// An indexed API error serializable to JSON. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct IndexedErrorMessage { pub code: u16, pub message: String, - #[serde(default)] pub failures: Vec, } -/// An API error serializable to JSON. +/// A single failure in an index of API errors, serializable to JSON. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Failure { pub index: u64, diff --git a/common/warp_utils/src/reject.rs b/common/warp_utils/src/reject.rs index 874333a8da8..9a5a8ea5cfd 100644 --- a/common/warp_utils/src/reject.rs +++ b/common/warp_utils/src/reject.rs @@ -129,7 +129,7 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result() { - message = format!("BAD_REQUEST: error: {}", e.message); + message = format!("BAD_REQUEST: {}", e.message); code = StatusCode::BAD_REQUEST; let json = warp::reply::json(&IndexedErrorMessage { From 7895c504fb9e522e91c081d70ac03767d9d68432 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 9 Nov 2020 09:21:40 -0500 Subject: [PATCH 20/22] log updates --- beacon_node/http_api/src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index afc29937fcd..26f640ff0b9 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1742,14 +1742,14 @@ pub fn serve( Err(AttnError::AttestationAlreadyKnown(_)) => continue, Err(e) => { error!(log, - "failure verifying aggregate and proofs"; + "Failure verifying aggregate and proofs"; "error" => format!("{:?}", e), "request_index" => index, "aggregator_index" => aggregate.message.aggregator_index, "attestation_index" => aggregate.message.aggregate.data.index, "attestation_slot" => aggregate.message.aggregate.data.slot, ); - failures.push(api_types::Failure::new(index, format!("{:?}", e))); + failures.push(api_types::Failure::new(index, format!("Verification: {:?}", e))); }, } } @@ -1763,22 +1763,22 @@ pub fn serve( for (index, verified_aggregate) in verified_aggregates { if let Err(e) = chain.apply_attestation_to_fork_choice(&verified_aggregate) { error!(log, - "failure applying verified aggregate attestation to fork choice"; + "Failure applying verified aggregate attestation to fork choice"; "error" => format!("{:?}", e), "request_index" => index, "aggregator_index" => verified_aggregate.aggregate().message.aggregator_index, "attestation_index" => verified_aggregate.attestation().data.index, "attestation_slot" => verified_aggregate.attestation().data.slot, ); - failures.push(api_types::Failure::new(index, format!("{:?}", e))); + failures.push(api_types::Failure::new(index, format!("Fork choice: {:?}", e))); } if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) { warn!(log, - "could not add verified aggregate attestation to the inclusion pool"; + "Could not add verified aggregate attestation to the inclusion pool"; "error" => format!("{:?}", e), "request_index" => index, ); - failures.push(api_types::Failure::new(index, format!("{:?}", e))); + failures.push(api_types::Failure::new(index, format!("Op pool: {:?}", e))); } } From 28d159d29fbc185123c680611f8bbc3570d8e7b6 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 9 Nov 2020 18:11:41 +1100 Subject: [PATCH 21/22] Change duties endpoint --- validator_client/src/duties_service.rs | 2 +- validator_client/src/validator_duty.rs | 161 ++++++++++++++----------- 2 files changed, 92 insertions(+), 71 deletions(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 2854571f634..8bba3930074 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -617,7 +617,7 @@ impl DutiesService { &self.beacon_node, current_epoch, request_epoch, - pubkeys.as_slice(), + pubkeys, &log, ) .await diff --git a/validator_client/src/validator_duty.rs b/validator_client/src/validator_duty.rs index 7f68df82a57..357ffb666b3 100644 --- a/validator_client/src/validator_duty.rs +++ b/validator_client/src/validator_duty.rs @@ -55,36 +55,29 @@ impl ValidatorDuty { beacon_node: &BeaconNodeHttpClient, current_epoch: Epoch, request_epoch: Epoch, - pubkeys: &[(PublicKey, Option)], + mut pubkeys: Vec<(PublicKey, Option)>, log: &Logger, ) -> Result, String> { - let mut duties = Vec::with_capacity(pubkeys.len()); - let mut query_indices = Vec::with_capacity(pubkeys.len()); - let mut query_pubkeys = Vec::with_capacity(pubkeys.len()); - - for (pubkey, index_opt) in pubkeys { - if let Some(index) = index_opt { - // If we know the index already, include it in the duties query - query_indices.push(*index); - query_pubkeys.push(pubkey.clone()); - } else { - // Query for any pubkeys we don't know the index for in the current epoch. - let pubkey_bytes = PublicKeyBytes::from(pubkey); - if let Some(index) = beacon_node + for (pubkey, index_opt) in &mut pubkeys { + if index_opt.is_none() { + *index_opt = beacon_node .get_beacon_states_validator_id( StateId::Head, - &ValidatorId::PublicKey(pubkey_bytes.clone()), + &ValidatorId::PublicKey(PublicKeyBytes::from(&*pubkey)), ) .await - .map_err(|e| format!("Failed to get validator index: {}", e))? - .map(|body| body.data.index) - { - query_indices.push(index); - query_pubkeys.push(pubkey.clone()); - } else { - // If we still don't know the index, add an empty duty - duties.push(Self::no_duties(pubkey.clone(), None)); - } + .map_err(|e| { + error!( + log, + "Failed to obtain validator index"; + "pubkey" => ?pubkey, + "error" => ?e + ) + }) + // Supress the error since we've already logged an error and we don't want to + // stop the rest of the code. + .ok() + .and_then(|body_opt| body_opt.map(|body| body.data.index)); } } @@ -93,64 +86,92 @@ impl ValidatorDuty { beacon_node .get_validator_duties_proposer(current_epoch) .await - .map_err(|e| format!("Failed to get proposer indices: {}", e))? - .data + .map(|resp| resp.data) + // Exit early if there's an error. + .map_err(|e| format!("Failed to get proposer indices: {:?}", e))? .into_iter() - .fold(HashMap::new(), |mut map, proposer_data| { - map.entry(proposer_data.validator_index) - .or_insert_with(Vec::new) - .push(proposer_data.slot); - map - }) + .fold( + HashMap::with_capacity(pubkeys.len()), + |mut map, proposer_data| { + map.entry(proposer_data.validator_index) + .or_insert_with(Vec::new) + .push(proposer_data.slot); + map + }, + ) } else { HashMap::new() }; - // Query attester duties for known indices, add duties to our duty `Vec` and map the result - // to validator indices. We track indices so we can determine which validator indices from - // our query yielded results. - let returned_indices: Vec = beacon_node + let query_indices = pubkeys + .iter() + .filter_map(|(_, index_opt)| *index_opt) + .collect::>(); + let attester_data_map = beacon_node .post_validator_duties_attester(request_epoch, query_indices.as_slice()) .await - .map_err(|e| format!("Failed to get attester duties: {}", e))? - .data + .map(|resp| resp.data) + // Exit early if there's an error. + .map_err(|e| format!("Failed to get attester duties: {:?}", e))? .into_iter() - .filter_map(|attester_data| match attester_data.pubkey.decompress() { - Ok(pubkey) => { - duties.push(ValidatorDuty { - validator_pubkey: pubkey, - validator_index: Some(attester_data.validator_index), - attestation_slot: Some(attester_data.slot), - attestation_committee_index: Some(attester_data.committee_index), - attestation_committee_position: Some( - attester_data.validator_committee_index as usize, - ), - committee_count_at_slot: Some(attester_data.committees_at_slot), - committee_length: Some(attester_data.committee_length), - block_proposal_slots: proposal_slots_by_index - .get(&attester_data.validator_index) - .cloned(), - }); - Some(attester_data.validator_index) - } - Err(e) => { - error!( - log, - "Could not deserialize validator public key"; - "error" => format!("{:?}", e), - "validator_index" => attester_data.validator_index - ); - None + .fold( + HashMap::with_capacity(pubkeys.len()), + |mut map, attester_data| { + map.insert(attester_data.validator_index, attester_data); + map + }, + ); + + let duties = pubkeys + .into_iter() + .map(|(pubkey, index_opt)| { + if let Some(index) = index_opt { + if let Some(attester_data) = attester_data_map.get(&index) { + if attester_data.validator_index != index { + error!( + log, + "Validator index mismatch"; + "local" => index, + "remote" => attester_data.validator_index + ); + + return Self::no_duties(pubkey, None); + } + + match attester_data.pubkey.decompress() { + Ok(pubkey) => ValidatorDuty { + validator_pubkey: pubkey, + validator_index: Some(attester_data.validator_index), + attestation_slot: Some(attester_data.slot), + attestation_committee_index: Some(attester_data.committee_index), + attestation_committee_position: Some( + attester_data.validator_committee_index as usize, + ), + committee_count_at_slot: Some(attester_data.committees_at_slot), + committee_length: Some(attester_data.committee_length), + block_proposal_slots: proposal_slots_by_index + .get(&attester_data.validator_index) + .cloned(), + }, + Err(e) => { + error!( + log, + "Could not deserialize validator public key"; + "error" => format!("{:?}", e), + "validator_index" => attester_data.validator_index + ); + Self::no_duties(pubkey, Some(index)) + } + } + } else { + Self::no_duties(pubkey, Some(index)) + } + } else { + Self::no_duties(pubkey, None) } }) .collect(); - // Compare queried validators with results, and add empty duties where necessary. - for (index, pubkey) in query_indices.into_iter().zip(query_pubkeys.into_iter()) { - if !returned_indices.contains(&index) { - duties.push(Self::no_duties(pubkey, Some(index))); - } - } Ok(duties) } From 884929329bb4facbefd0cc2617de10540215168e Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 9 Nov 2020 09:52:08 -0500 Subject: [PATCH 22/22] remove index check --- validator_client/src/validator_duty.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/validator_client/src/validator_duty.rs b/validator_client/src/validator_duty.rs index 357ffb666b3..b87a9dbc34f 100644 --- a/validator_client/src/validator_duty.rs +++ b/validator_client/src/validator_duty.rs @@ -127,17 +127,6 @@ impl ValidatorDuty { .map(|(pubkey, index_opt)| { if let Some(index) = index_opt { if let Some(attester_data) = attester_data_map.get(&index) { - if attester_data.validator_index != index { - error!( - log, - "Validator index mismatch"; - "local" => index, - "remote" => attester_data.validator_index - ); - - return Self::no_duties(pubkey, None); - } - match attester_data.pubkey.decompress() { Ok(pubkey) => ValidatorDuty { validator_pubkey: pubkey,