Skip to content

Commit

Permalink
Merge branch 'master' into prefetch-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
pugachAG authored Dec 12, 2022
2 parents 7d0a771 + 56cc704 commit d08d304
Show file tree
Hide file tree
Showing 22 changed files with 197 additions and 214 deletions.
4 changes: 4 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ who belongs either to the super owners or the normal owners group.
well. They will review your tests, and make sure that they can convince
themselves the test coverage is adequate before they even look into the
change, so make sure you tested all the corner cases.
- it is normal to sometimes require multiple rounds of reviews to get a PR
merged. If your PR received some feedback from a reviewer, use the [github
UI](https://stackoverflow.com/questions/40893008/how-to-resume-review-process-after-updating-pull-request-at-github)
to re-request a review.

The author is also free to directly request reviews from specific persons
[through the github
Expand Down
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ sandbox-release: neard-sandbox-release
neard-sandbox-release:
cargo build -p neard --features sandbox --release

shardnet-release:
cargo build -p neard --release --features shardnet


.PHONY: docker-nearcore docker-nearcore-nightly release neard debug
.PHONY: perf-release perf-debug nightly-release nightly-debug assertions-release sandbox
Expand Down
2 changes: 0 additions & 2 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ no_cache = ["near-store/no_cache"]
protocol_feature_flat_state = ["near-store/protocol_feature_flat_state"]
protocol_feature_reject_blocks_with_outdated_protocol_version = ["near-primitives/protocol_feature_reject_blocks_with_outdated_protocol_version"]

shardnet = ["protocol_feature_reject_blocks_with_outdated_protocol_version"]

nightly = [
"nightly_protocol",
]
Expand Down
2 changes: 0 additions & 2 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,3 @@ sandbox = [
"near-client-primitives/sandbox",
"near-chain/sandbox",
]
# Shardnet is the experimental network that we deploy for chunk-only producer testing.
shardnet = ["near-network/shardnet"]
3 changes: 0 additions & 3 deletions chain/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,3 @@ near-o11y = { path = "../../core/o11y" }
near-primitives = { path = "../../core/primitives" }
near-store = { path = "../../core/store" }
node-runtime = { path = "../../runtime/runtime" }

[features]
shardnet = ["nearcore/shardnet", "near-client/shardnet", "near-primitives/shardnet"]
2 changes: 0 additions & 2 deletions chain/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,3 @@ performance_stats = [
"near-rust-allocator-proxy",
]
test_features = []

shardnet = []
8 changes: 1 addition & 7 deletions chain/network/src/config_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,7 @@ fn default_peer_expiration_duration() -> Duration {

// If non-zero - we'll skip sending tombstones during initial sync and for that many seconds after start.
fn default_skip_tombstones() -> i64 {
// Enable by default in shardnet only.
if cfg!(feature = "shardnet") {
// Skip sending tombstones during sync and 240 seconds after start.
240
} else {
0
}
0
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub(crate) enum ClosingReason {
#[error("Received a message of type not allowed on this connection.")]
DisallowedMessage,
#[error("PeerManager requested to close the connection")]
PeerManager,
PeerManagerRequest,
#[error("Received DisconnectMessage from peer")]
DisconnectMessage,
#[error("Peer clock skew exceeded {MAX_CLOCK_SKEW}")]
Expand Down Expand Up @@ -1551,7 +1551,7 @@ impl actix::Handler<WithSpanContext<Stop>> for PeerActor {
ctx,
match msg.ban_reason {
Some(reason) => ClosingReason::Ban(reason),
None => ClosingReason::PeerManager,
None => ClosingReason::PeerManagerRequest,
},
);
}
Expand Down
9 changes: 1 addition & 8 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,7 @@ impl PeerManagerActor {
let _timer =
metrics::PEER_MANAGER_TRIGGER_TIME.with_label_values(&["monitor_peers"]).start_timer();

self.state.peer_store.unban(&self.clock);
if let Err(err) = self.state.peer_store.update_connected_peers_last_seen(&self.clock) {
tracing::error!(target: "network", ?err, "Failed to update peers last seen time.");
}
self.state.peer_store.update(&self.clock);

if self.is_outbound_bootstrap_needed() {
let tier2 = self.state.tier2.load();
Expand Down Expand Up @@ -547,10 +544,6 @@ impl PeerManagerActor {
// If there are too many active connections try to remove some connections
self.maybe_stop_active_connection();

if let Err(err) = self.state.peer_store.remove_expired(&self.clock) {
tracing::error!(target: "network", ?err, "Failed to remove expired peers");
};

// Find peers that are not reliable (too much behind) - and make sure that we're not routing messages through them.
let unreliable_peers = self.unreliable_peers();
metrics::PEER_UNRELIABLE.set(unreliable_peers.len() as i64);
Expand Down
165 changes: 81 additions & 84 deletions chain/network/src/peer_manager/peer_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ mod testonly;
#[cfg(test)]
mod tests;

/// How often to update the KnownPeerState.last_seen in storage.
const UPDATE_LAST_SEEN_INTERVAL: time::Duration = time::Duration::minutes(1);

/// Level of trust we have about a new (PeerId, Addr) pair.
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
enum TrustLevel {
Expand Down Expand Up @@ -224,16 +227,73 @@ impl Inner {
}
Ok(())
}

/// Removes peers that are not responding for expiration period.
fn remove_expired(&mut self, now: time::Utc) {
let mut to_remove = vec![];
for (peer_id, peer_status) in self.peer_states.iter() {
if peer_status.status != KnownPeerStatus::Connected
&& now > peer_status.last_seen + self.config.peer_expiration_duration
{
tracing::debug!(target: "network", "Removing peer: last seen {:?} ago", now-peer_status.last_seen);
to_remove.push(peer_id.clone());
}
}
if let Err(err) = self.delete_peers(&to_remove) {
tracing::error!(target: "network", ?err, "Failed to remove expired peers");
}
}

fn unban(&mut self, now: time::Utc) {
let mut to_unban = vec![];
for (peer_id, peer_state) in &self.peer_states {
if let KnownPeerStatus::Banned(_, ban_time) = peer_state.status {
if now < ban_time + self.config.ban_window {
continue;
}
tracing::info!(target: "network", unbanned = ?peer_id, ?ban_time, "unbanning a peer");
to_unban.push(peer_id.clone());
}
}
for peer_id in &to_unban {
if let Err(err) = self.peer_unban(&peer_id) {
tracing::error!(target: "network", ?peer_id, ?err, "Failed to unban a peer");
}
}
}

/// Update the 'last_seen' time for all the peers that we're currently connected to.
fn update_last_seen(&mut self, now: time::Utc) {
for (peer_id, peer_state) in self.peer_states.iter_mut() {
if peer_state.status == KnownPeerStatus::Connected
&& now > peer_state.last_seen + UPDATE_LAST_SEEN_INTERVAL
{
peer_state.last_seen = now;
if let Err(err) = self.store.set_peer_state(peer_id, peer_state) {
tracing::error!(target: "network", ?peer_id, ?err, "Failed to update peers last seen time.");
}
}
}
}

/// Cleans up the state of the PeerStore, due to passing time.
/// * it unbans a peer if config.ban_window has passed
/// * it updates KnownPeerStatus.last_seen of the connected peers
/// * it removes peers which were not seen for config.peer_expiration_duration
/// This function should be called periodically.
pub fn update(&mut self, clock: &time::Clock) {
let now = clock.now_utc();
// TODO(gprusak): these operations could be put into a single DB write transaction.
self.unban(now);
self.update_last_seen(now);
self.remove_expired(now);
}
}

pub(crate) struct PeerStore(Mutex<Inner>);

impl PeerStore {
pub(crate) fn new(
clock: &time::Clock,
config: Config,
store: store::Store,
) -> anyhow::Result<Self> {
pub fn new(clock: &time::Clock, config: Config, store: store::Store) -> anyhow::Result<Self> {
let boot_nodes: HashSet<_> = config.boot_nodes.iter().map(|p| p.id.clone()).collect();
// A mapping from `PeerId` to `KnownPeerState`.
let mut peerid_2_state = HashMap::default();
Expand Down Expand Up @@ -345,29 +405,29 @@ impl PeerStore {
self.0.lock().config.blacklist.contains(*addr)
}

pub(crate) fn len(&self) -> usize {
pub fn len(&self) -> usize {
self.0.lock().peer_states.len()
}

pub(crate) fn is_banned(&self, peer_id: &PeerId) -> bool {
pub fn is_banned(&self, peer_id: &PeerId) -> bool {
self.0.lock().peer_states.get(peer_id).map_or(false, |s| s.status.is_banned())
}

pub(crate) fn count_banned(&self) -> usize {
pub fn count_banned(&self) -> usize {
self.0.lock().peer_states.values().filter(|st| st.status.is_banned()).count()
}

pub fn update(&self, clock: &time::Clock) {
self.0.lock().update(clock)
}

#[allow(dead_code)]
/// Returns the state of the current peer in memory.
pub(crate) fn get_peer_state(&self, peer_id: &PeerId) -> Option<KnownPeerState> {
pub fn get_peer_state(&self, peer_id: &PeerId) -> Option<KnownPeerState> {
self.0.lock().peer_states.get(peer_id).cloned()
}

pub(crate) fn peer_connected(
&self,
clock: &time::Clock,
peer_info: &PeerInfo,
) -> anyhow::Result<()> {
pub fn peer_connected(&self, clock: &time::Clock, peer_info: &PeerInfo) -> anyhow::Result<()> {
let mut inner = self.0.lock();
inner.add_signed_peer(clock, peer_info.clone())?;
let mut store = inner.store.clone();
Expand All @@ -377,29 +437,7 @@ impl PeerStore {
Ok(store.set_peer_state(&peer_info.id, entry)?)
}

/// Update the 'last_seen' time for all the peers that we're currently connected to.
pub(crate) fn update_connected_peers_last_seen(
&self,
clock: &time::Clock,
) -> anyhow::Result<()> {
let mut inner = self.0.lock();
let mut store = inner.store.clone();
for (peer_id, peer_state) in inner.peer_states.iter_mut() {
if peer_state.status == KnownPeerStatus::Connected
&& clock.now_utc() > peer_state.last_seen.saturating_add(time::Duration::minutes(1))
{
peer_state.last_seen = clock.now_utc();
store.set_peer_state(peer_id, peer_state)?
}
}
Ok(())
}

pub(crate) fn peer_disconnected(
&self,
clock: &time::Clock,
peer_id: &PeerId,
) -> anyhow::Result<()> {
pub fn peer_disconnected(&self, clock: &time::Clock, peer_id: &PeerId) -> anyhow::Result<()> {
let mut inner = self.0.lock();
let mut store = inner.store.clone();
if let Some(peer_state) = inner.peer_states.get_mut(peer_id) {
Expand All @@ -414,7 +452,7 @@ impl PeerStore {

/// Records the last attempt to connect to peer.
/// Marks the peer as Unknown (as we failed to connect to it).
pub(crate) fn peer_connection_attempt(
pub fn peer_connection_attempt(
&self,
clock: &time::Clock,
peer_id: &PeerId,
Expand All @@ -437,7 +475,7 @@ impl PeerStore {
Ok(())
}

pub(crate) fn peer_ban(
pub fn peer_ban(
&self,
clock: &time::Clock,
peer_id: &PeerId,
Expand All @@ -459,7 +497,7 @@ impl PeerStore {

/// Return unconnected or peers with unknown status that we can try to connect to.
/// Peers with unknown addresses are filtered out.
pub(crate) fn unconnected_peer(
pub fn unconnected_peer(
&self,
ignore_fn: impl Fn(&KnownPeerState) -> bool,
prefer_previously_connected_peer: bool,
Expand Down Expand Up @@ -499,37 +537,20 @@ impl PeerStore {
}

/// Return healthy known peers up to given amount.
pub(crate) fn healthy_peers(&self, max_count: usize) -> Vec<PeerInfo> {
pub fn healthy_peers(&self, max_count: usize) -> Vec<PeerInfo> {
self.0
.lock()
.find_peers(|p| matches!(p.status, KnownPeerStatus::Banned(_, _)).not(), max_count)
}

/// Removes peers that are not responding for expiration period.
pub(crate) fn remove_expired(&self, clock: &time::Clock) -> anyhow::Result<()> {
let mut inner = self.0.lock();
let now = clock.now_utc();
let mut to_remove = vec![];
for (peer_id, peer_status) in inner.peer_states.iter() {
let diff = now - peer_status.last_seen;
if peer_status.status != KnownPeerStatus::Connected
&& diff > inner.config.peer_expiration_duration
{
tracing::debug!(target: "network", "Removing peer: last seen {:?} ago", diff);
to_remove.push(peer_id.clone());
}
}
inner.delete_peers(&to_remove)
}

/// Adds peers we’ve learned about from other peers.
///
/// Identities of the nodes hasn’t been verified in any way. We don’t even
/// know if there is anything running at given addresses and even if there
/// are nodes there we haven’t received signatures of their peer ID.
///
/// See also [`Self::add_direct_peer`] and [`Self::add_signed_peer`].
pub(crate) fn add_indirect_peers(
pub fn add_indirect_peers(
&self,
clock: &time::Clock,
peers: impl Iterator<Item = PeerInfo>,
Expand Down Expand Up @@ -561,34 +582,10 @@ impl PeerStore {
/// confirming that identity yet.
///
/// See also [`Self::add_indirect_peers`] and [`Self::add_signed_peer`].
pub(crate) fn add_direct_peer(
&self,
clock: &time::Clock,
peer_info: PeerInfo,
) -> anyhow::Result<()> {
pub fn add_direct_peer(&self, clock: &time::Clock, peer_info: PeerInfo) -> anyhow::Result<()> {
self.0.lock().add_peer(clock, peer_info, TrustLevel::Direct)
}

pub fn unban(&self, clock: &time::Clock) {
let mut inner = self.0.lock();
let now = clock.now_utc();
let mut to_unban = vec![];
for (peer_id, peer_state) in &inner.peer_states {
if let KnownPeerStatus::Banned(_, ban_time) = peer_state.status {
if now < ban_time + inner.config.ban_window {
continue;
}
tracing::info!(target: "network", unbanned = ?peer_id, ?ban_time, "unbanning a peer");
to_unban.push(peer_id.clone());
}
}
for peer_id in &to_unban {
if let Err(err) = inner.peer_unban(&peer_id) {
tracing::error!(target: "network", ?err, "Failed to unban a peer");
}
}
}

pub fn load(&self) -> HashMap<PeerId, KnownPeerState> {
self.0.lock().peer_states.clone()
}
Expand Down
Loading

0 comments on commit d08d304

Please sign in to comment.