diff --git a/.gitlab/pipeline/zombienet/polkadot.yml b/.gitlab/pipeline/zombienet/polkadot.yml
index c246d98a0489d..4112096a2ed74 100644
--- a/.gitlab/pipeline/zombienet/polkadot.yml
+++ b/.gitlab/pipeline/zombienet/polkadot.yml
@@ -139,6 +139,14 @@ zombienet-polkadot-functional-0009-approval-voting-coalescing:
       --local-dir="${LOCAL_DIR}/functional"
       --test="0009-approval-voting-coalescing.zndsl"
 
+zombienet-polkadot-functional-0010-validator-disabling:
+  extends:
+    - .zombienet-polkadot-common
+  script:
+    - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
+      --local-dir="${LOCAL_DIR}/functional"
+      --test="0010-validator-disabling.zndsl"
+
 zombienet-polkadot-smoke-0001-parachains-smoke-test:
   extends:
     - .zombienet-polkadot-common
diff --git a/polkadot/node/network/statement-distribution/src/error.rs b/polkadot/node/network/statement-distribution/src/error.rs
index b676e5b6a2235..a712ab6da436f 100644
--- a/polkadot/node/network/statement-distribution/src/error.rs
+++ b/polkadot/node/network/statement-distribution/src/error.rs
@@ -75,6 +75,9 @@ pub enum Error {
 	#[error("Fetching availability cores failed {0:?}")]
 	FetchAvailabilityCores(RuntimeApiError),
 
+	#[error("Fetching disabled validators failed {0:?}")]
+	FetchDisabledValidators(runtime::Error),
+
 	#[error("Fetching validator groups failed {0:?}")]
 	FetchValidatorGroups(RuntimeApiError),
 
diff --git a/polkadot/node/network/statement-distribution/src/v2/grid.rs b/polkadot/node/network/statement-distribution/src/v2/grid.rs
index 19bad34c44ff9..19f23053192c1 100644
--- a/polkadot/node/network/statement-distribution/src/v2/grid.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/grid.rs
@@ -253,7 +253,9 @@ impl GridTracker {
 	/// This checks whether the peer is allowed to send us manifests
 	/// about this group at this relay-parent. This also does sanity
 	/// checks on the format of the manifest and the amount of votes
-	/// it contains. It has effects on the stored state only when successful.
+	/// it contains. It assumes that the votes from disabled validators
+	/// are already filtered out.
+	/// It has effects on the stored state only when successful.
 	///
 	/// This returns a `bool` on success, which if true indicates that an acknowledgement is
 	/// to be sent in response to the received manifest. This only occurs when the
diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs
index 2f06d3685b814..02fdecdd9bb32 100644
--- a/polkadot/node/network/statement-distribution/src/v2/mod.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs
@@ -17,11 +17,11 @@
 //! Implementation of the v2 statement distribution protocol,
 //! designed for asynchronous backing.
 
-use net_protocol::{filter_by_peer_version, peer_set::ProtocolVersion};
+use bitvec::prelude::{BitVec, Lsb0};
 use polkadot_node_network_protocol::{
-	self as net_protocol,
+	self as net_protocol, filter_by_peer_version,
 	grid_topology::SessionGridTopology,
-	peer_set::ValidationVersion,
+	peer_set::{ProtocolVersion, ValidationVersion},
 	request_response::{
 		incoming::OutgoingResponse,
 		v2::{AttestedCandidateRequest, AttestedCandidateResponse},
@@ -64,7 +64,7 @@ use futures::{
 use std::{
 	collections::{
 		hash_map::{Entry, HashMap},
-		HashSet,
+		BTreeSet, HashSet,
 	},
 	time::{Duration, Instant},
 };
@@ -96,6 +96,7 @@ const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement");
 const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep =
 	Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent");
 const COST_EXCESSIVE_SECONDED: Rep = Rep::CostMinor("Sent Excessive `Seconded` Statements");
+const COST_DISABLED_VALIDATOR: Rep = Rep::CostMinor("Sent a statement from a disabled validator");
 
 const COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE: Rep =
 	Rep::CostMinor("Unexpected Manifest, missing knowlege for relay parent");
@@ -189,6 +190,8 @@ struct PerSessionState {
 	// getting the topology from the gossip-support subsystem
 	grid_view: Option<grid::SessionTopologyView>,
 	local_validator: Option<LocalValidatorIndex>,
+	// We store the latest state here based on union of leaves.
+	disabled_validators: BTreeSet<ValidatorIndex>,
 }
 
 impl PerSessionState {
@@ -205,7 +208,16 @@ impl PerSessionState {
 		)
 		.map(|(_, index)| LocalValidatorIndex::Active(index));
 
-		PerSessionState { session_info, groups, authority_lookup, grid_view: None, local_validator }
+		let disabled_validators = BTreeSet::new();
+
+		PerSessionState {
+			session_info,
+			groups,
+			authority_lookup,
+			grid_view: None,
+			local_validator,
+			disabled_validators,
+		}
 	}
 
 	fn supply_topology(
@@ -234,6 +246,33 @@ impl PerSessionState {
 	fn is_not_validator(&self) -> bool {
 		self.grid_view.is_some() && self.local_validator.is_none()
 	}
+
+	/// A convenience function to generate a disabled bitmask for the given backing group.
+	/// The output bits are set to `true` for validators that are disabled.
+	/// Returns `None` if the group index is out of bounds.
+	pub fn disabled_bitmask(&self, group: GroupIndex) -> Option<BitVec<u8, Lsb0>> {
+		let group = self.groups.get(group)?;
+		let mask = BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)));
+		Some(mask)
+	}
+
+	/// Returns `true` if the given validator is disabled in the current session.
+	pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
+		self.disabled_validators.contains(validator_index)
+	}
+
+	/// Extend the list of disabled validators.
+	pub fn extend_disabled_validators(
+		&mut self,
+		disabled: impl IntoIterator<Item = ValidatorIndex>,
+	) {
+		self.disabled_validators.extend(disabled);
+	}
+
+	/// Clear the list of disabled validators.
+	pub fn clear_disabled_validators(&mut self) {
+		self.disabled_validators.clear();
+	}
 }
 
 pub(crate) struct State {
@@ -510,13 +549,20 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 
 	let new_relay_parents =
 		state.implicit_view.all_allowed_relay_parents().cloned().collect::<Vec<_>>();
-	for new_relay_parent in new_relay_parents.iter().cloned() {
-		if state.per_relay_parent.contains_key(&new_relay_parent) {
-			continue
-		}
 
-		// New leaf: fetch info from runtime API and initialize
-		// `per_relay_parent`.
+	// We clear the list of disabled validators to reset it properly based on union of leaves.
+	let mut cleared_disabled_validators: BTreeSet<SessionIndex> = BTreeSet::new();
+
+	for new_relay_parent in new_relay_parents.iter().cloned() {
+		// Even if we processed this relay parent before, we need to fetch the list of disabled
+		// validators based on union of active leaves.
+		let disabled_validators =
+			polkadot_node_subsystem_util::vstaging::get_disabled_validators_with_fallback(
+				ctx.sender(),
+				new_relay_parent,
+			)
+			.await
+			.map_err(JfyiError::FetchDisabledValidators)?;
 
 		let session_index = polkadot_node_subsystem_util::request_session_index_for_child(
 			new_relay_parent,
@@ -527,23 +573,6 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 		.map_err(JfyiError::RuntimeApiUnavailable)?
 		.map_err(JfyiError::FetchSessionIndex)?;
 
-		let availability_cores = polkadot_node_subsystem_util::request_availability_cores(
-			new_relay_parent,
-			ctx.sender(),
-		)
-		.await
-		.await
-		.map_err(JfyiError::RuntimeApiUnavailable)?
-		.map_err(JfyiError::FetchAvailabilityCores)?;
-
-		let group_rotation_info =
-			polkadot_node_subsystem_util::request_validator_groups(new_relay_parent, ctx.sender())
-				.await
-				.await
-				.map_err(JfyiError::RuntimeApiUnavailable)?
-				.map_err(JfyiError::FetchValidatorGroups)?
-				.1;
-
 		if !state.per_session.contains_key(&session_index) {
 			let session_info = polkadot_node_subsystem_util::request_session_info(
 				new_relay_parent,
@@ -579,9 +608,49 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 
 		let per_session = state
 			.per_session
-			.get(&session_index)
+			.get_mut(&session_index)
 			.expect("either existed or just inserted; qed");
 
+		if cleared_disabled_validators.insert(session_index) {
+			per_session.clear_disabled_validators();
+		}
+
+		if !disabled_validators.is_empty() {
+			gum::debug!(
+				target: LOG_TARGET,
+				relay_parent = ?new_relay_parent,
+				?session_index,
+				?disabled_validators,
+				"Disabled validators detected"
+			);
+
+			per_session.extend_disabled_validators(disabled_validators);
+		}
+
+		if state.per_relay_parent.contains_key(&new_relay_parent) {
+			continue
+		}
+
+		// New leaf: fetch info from runtime API and initialize
+		// `per_relay_parent`.
+
+		let availability_cores = polkadot_node_subsystem_util::request_availability_cores(
+			new_relay_parent,
+			ctx.sender(),
+		)
+		.await
+		.await
+		.map_err(JfyiError::RuntimeApiUnavailable)?
+		.map_err(JfyiError::FetchAvailabilityCores)?;
+
+		let group_rotation_info =
+			polkadot_node_subsystem_util::request_validator_groups(new_relay_parent, ctx.sender())
+				.await
+				.await
+				.map_err(JfyiError::RuntimeApiUnavailable)?
+				.map_err(JfyiError::FetchValidatorGroups)?
+				.1;
+
 		let local_validator = per_session.local_validator.and_then(|v| {
 			if let LocalValidatorIndex::Active(idx) = v {
 				find_active_validator_state(
@@ -1452,6 +1521,17 @@ async fn handle_incoming_statement<Context>(
 			},
 		};
 
+	if per_session.is_disabled(&statement.unchecked_validator_index()) {
+		gum::debug!(
+			target: LOG_TARGET,
+			?relay_parent,
+			validator_index = ?statement.unchecked_validator_index(),
+			"Ignoring a statement from disabled validator."
+		);
+		modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
+		return
+	}
+
 	let (active, cluster_sender_index) = {
 		// This block of code only returns `Some` when both the originator and
 		// the sending peer are in the cluster.
@@ -1572,7 +1652,7 @@ async fn handle_incoming_statement<Context>(
 		checked_statement.clone(),
 		StatementOrigin::Remote,
 	) {
-		Err(statement_store::ValidatorUnknown) => {
+		Err(statement_store::Error::ValidatorUnknown) => {
 			// sanity: should never happen.
 			gum::warn!(
 				target: LOG_TARGET,
@@ -2110,7 +2190,7 @@ async fn handle_incoming_manifest_common<'a, Context>(
 	candidate_hash: CandidateHash,
 	relay_parent: Hash,
 	para_id: ParaId,
-	manifest_summary: grid::ManifestSummary,
+	mut manifest_summary: grid::ManifestSummary,
 	manifest_kind: grid::ManifestKind,
 	reputation: &mut ReputationAggregator,
 ) -> Option<ManifestImportSuccess<'a>> {
@@ -2195,6 +2275,12 @@ async fn handle_incoming_manifest_common<'a, Context>(
 	// 2. sanity checks: peer is validator, bitvec size, import into grid tracker
 	let group_index = manifest_summary.claimed_group_index;
 	let claimed_parent_hash = manifest_summary.claimed_parent_hash;
+
+	// Ignore votes from disabled validators when counting towards the threshold.
+	let disabled_mask = per_session.disabled_bitmask(group_index).unwrap_or_default();
+	manifest_summary.statement_knowledge.mask_seconded(&disabled_mask);
+	manifest_summary.statement_knowledge.mask_valid(&disabled_mask);
+
 	let acknowledge = match local_validator.grid_tracker.import_manifest(
 		grid_topology,
 		&per_session.groups,
@@ -2770,6 +2856,13 @@ pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut St
 			}
 		}
 
+		// Add disabled validators to the unwanted mask.
+		let disabled_mask = per_session
+			.disabled_bitmask(group_index)
+			.expect("group existence checked above; qed");
+		unwanted_mask.seconded_in_group |= &disabled_mask;
+		unwanted_mask.validated_in_group |= &disabled_mask;
+
 		// don't require a backing threshold for cluster candidates.
 		let local_validator = relay_parent_state.local_validator.as_ref()?;
 		let require_backing = local_validator
@@ -2777,14 +2870,14 @@ pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut St
 			.as_ref()
 			.map_or(true, |active| active.group != group_index);
 
-		Some(RequestProperties {
-			unwanted_mask,
-			backing_threshold: if require_backing {
-				Some(per_session.groups.get_size_and_backing_threshold(group_index)?.1)
-			} else {
-				None
-			},
-		})
+		let backing_threshold = if require_backing {
+			let threshold = per_session.groups.get_size_and_backing_threshold(group_index)?.1;
+			Some(threshold)
+		} else {
+			None
+		};
+
+		Some(RequestProperties { unwanted_mask, backing_threshold })
 	};
 
 	while let Some(request) = state.request_manager.next_request(
@@ -2857,6 +2950,10 @@ pub(crate) async fn handle_response<Context>(
 			Some(g) => g,
 		};
 
+		let disabled_mask = per_session
+			.disabled_bitmask(group_index)
+			.expect("group_index checked above; qed");
+
 		let res = response.validate_response(
 			&mut state.request_manager,
 			group,
@@ -2871,6 +2968,7 @@ pub(crate) async fn handle_response<Context>(
 
 				Some(g_index) == expected_group
 			},
+			disabled_mask,
 		);
 
 		for (peer, rep) in res.reputation_changes {
@@ -2968,6 +3066,14 @@ pub(crate) async fn handle_response<Context>(
 	//    includable.
 }
 
+/// Returns true if the statement filter meets the backing threshold for grid requests.
+pub(crate) fn seconded_and_sufficient(
+	filter: &StatementFilter,
+	backing_threshold: Option<usize>,
+) -> bool {
+	backing_threshold.map_or(true, |t| filter.has_seconded() && filter.backing_validators() >= t)
+}
+
 /// Answer an incoming request for a candidate.
 pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
 	let ResponderMessage { request, sent_feedback } = message;
@@ -3008,11 +3114,13 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
 		Some(d) => d,
 	};
 
-	let group_size = per_session
+	let group_index = confirmed.group_index();
+	let group = per_session
 		.groups
-		.get(confirmed.group_index())
-		.expect("group from session's candidate always known; qed")
-		.len();
+		.get(group_index)
+		.expect("group from session's candidate always known; qed");
+
+	let group_size = group.len();
 
 	// check request bitfields are right size.
 	if mask.seconded_in_group.len() != group_size || mask.validated_in_group.len() != group_size {
@@ -3065,17 +3173,59 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
 
 	// Transform mask with 'OR' semantics into one with 'AND' semantics for the API used
 	// below.
-	let and_mask = StatementFilter {
+	let mut and_mask = StatementFilter {
 		seconded_in_group: !mask.seconded_in_group.clone(),
 		validated_in_group: !mask.validated_in_group.clone(),
 	};
 
+	// Ignore disabled validators from the latest state when sending the response.
+	let disabled_mask =
+		per_session.disabled_bitmask(group_index).expect("group existence checked; qed");
+	and_mask.mask_seconded(&disabled_mask);
+	and_mask.mask_valid(&disabled_mask);
+
+	let mut sent_filter = StatementFilter::blank(group_size);
 	let statements: Vec<_> = relay_parent_state
 		.statement_store
-		.group_statements(&per_session.groups, confirmed.group_index(), *candidate_hash, &and_mask)
-		.map(|s| s.as_unchecked().clone())
+		.group_statements(&per_session.groups, group_index, *candidate_hash, &and_mask)
+		.map(|s| {
+			let s = s.as_unchecked().clone();
+			let index_in_group = |v: ValidatorIndex| group.iter().position(|x| &v == x);
+			let Some(i) = index_in_group(s.unchecked_validator_index()) else { return s };
+
+			match s.unchecked_payload() {
+				CompactStatement::Seconded(_) => {
+					sent_filter.seconded_in_group.set(i, true);
+				},
+				CompactStatement::Valid(_) => {
+					sent_filter.validated_in_group.set(i, true);
+				},
+			}
+			s
+		})
 		.collect();
 
+	// There should be no response at all for grid requests when the
+	// backing threshold is no longer met as a result of disabled validators.
+	if !is_cluster {
+		let threshold = per_session
+			.groups
+			.get_size_and_backing_threshold(group_index)
+			.expect("group existence checked above; qed")
+			.1;
+
+		if !seconded_and_sufficient(&sent_filter, Some(threshold)) {
+			gum::info!(
+				target: LOG_TARGET,
+				?candidate_hash,
+				relay_parent = ?confirmed.relay_parent(),
+				?group_index,
+				"Dropping a request from a grid peer because the backing threshold is no longer met."
+			);
+			return
+		}
+	}
+
 	// Update bookkeeping about which statements peers have received.
 	for statement in &statements {
 		if is_cluster {
diff --git a/polkadot/node/network/statement-distribution/src/v2/requests.rs b/polkadot/node/network/statement-distribution/src/v2/requests.rs
index 8507a4b827690..bed3d5c18ae2b 100644
--- a/polkadot/node/network/statement-distribution/src/v2/requests.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/requests.rs
@@ -30,12 +30,13 @@
 //! (which requires state not owned by the request manager).
 
 use super::{
-	BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT, COST_IMPROPERLY_DECODED_RESPONSE,
-	COST_INVALID_RESPONSE, COST_INVALID_SIGNATURE, COST_UNREQUESTED_RESPONSE_STATEMENT,
-	REQUEST_RETRY_DELAY,
+	seconded_and_sufficient, BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT,
+	COST_IMPROPERLY_DECODED_RESPONSE, COST_INVALID_RESPONSE, COST_INVALID_SIGNATURE,
+	COST_UNREQUESTED_RESPONSE_STATEMENT, REQUEST_RETRY_DELAY,
 };
 use crate::LOG_TARGET;
 
+use bitvec::prelude::{BitVec, Lsb0};
 use polkadot_node_network_protocol::{
 	request_response::{
 		outgoing::{Recipient as RequestRecipient, RequestError},
@@ -495,10 +496,6 @@ fn find_request_target_with_update(
 	}
 }
 
-fn seconded_and_sufficient(filter: &StatementFilter, backing_threshold: Option<usize>) -> bool {
-	backing_threshold.map_or(true, |t| filter.has_seconded() && filter.backing_validators() >= t)
-}
-
 /// A response to a request, which has not yet been handled.
 pub struct UnhandledResponse {
 	response: TaggedResponse,
@@ -542,6 +539,7 @@ impl UnhandledResponse {
 		session: SessionIndex,
 		validator_key_lookup: impl Fn(ValidatorIndex) -> Option<ValidatorId>,
 		allowed_para_lookup: impl Fn(ParaId, GroupIndex) -> bool,
+		disabled_mask: BitVec<u8, Lsb0>,
 	) -> ResponseValidationOutput {
 		let UnhandledResponse {
 			response: TaggedResponse { identifier, requested_peer, props, response },
@@ -625,6 +623,7 @@ impl UnhandledResponse {
 			session,
 			validator_key_lookup,
 			allowed_para_lookup,
+			disabled_mask,
 		);
 
 		if let CandidateRequestStatus::Complete { .. } = output.request_status {
@@ -644,6 +643,7 @@ fn validate_complete_response(
 	session: SessionIndex,
 	validator_key_lookup: impl Fn(ValidatorIndex) -> Option<ValidatorId>,
 	allowed_para_lookup: impl Fn(ParaId, GroupIndex) -> bool,
+	disabled_mask: BitVec<u8, Lsb0>,
 ) -> ResponseValidationOutput {
 	let RequestProperties { backing_threshold, mut unwanted_mask } = props;
 
@@ -751,6 +751,10 @@ fn validate_complete_response(
 				},
 			}
 
+			if disabled_mask.get(i).map_or(false, |x| *x) {
+				continue
+			}
+
 			let validator_public =
 				match validator_key_lookup(unchecked_statement.unchecked_validator_index()) {
 					None => {
@@ -1013,6 +1017,7 @@ mod tests {
 		let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
 
 		let unwanted_mask = StatementFilter::blank(group_size);
+		let disabled_mask: BitVec<u8, Lsb0> = Default::default();
 		let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
 
 		// Get requests.
@@ -1056,6 +1061,7 @@ mod tests {
 				0,
 				validator_key_lookup,
 				allowed_para_lookup,
+				disabled_mask.clone(),
 			);
 			assert_eq!(
 				output,
@@ -1094,6 +1100,7 @@ mod tests {
 				0,
 				validator_key_lookup,
 				allowed_para_lookup,
+				disabled_mask,
 			);
 			assert_eq!(
 				output,
@@ -1167,12 +1174,14 @@ mod tests {
 			};
 			let validator_key_lookup = |_v| None;
 			let allowed_para_lookup = |_para, _g_index| true;
+			let disabled_mask: BitVec<u8, Lsb0> = Default::default();
 			let output = response.validate_response(
 				&mut request_manager,
 				group,
 				0,
 				validator_key_lookup,
 				allowed_para_lookup,
+				disabled_mask,
 			);
 			assert_eq!(
 				output,
@@ -1245,12 +1254,14 @@ mod tests {
 			let validator_key_lookup = |_v| None;
 			let allowed_para_lookup = |_para, _g_index| true;
 			let statements = vec![];
+			let disabled_mask: BitVec<u8, Lsb0> = Default::default();
 			let output = response.validate_response(
 				&mut request_manager,
 				group,
 				0,
 				validator_key_lookup,
 				allowed_para_lookup,
+				disabled_mask,
 			);
 			assert_eq!(
 				output,
diff --git a/polkadot/node/network/statement-distribution/src/v2/statement_store.rs b/polkadot/node/network/statement-distribution/src/v2/statement_store.rs
index 96d976e22cd51..022461e55511c 100644
--- a/polkadot/node/network/statement-distribution/src/v2/statement_store.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/statement_store.rs
@@ -97,10 +97,10 @@ impl StatementStore {
 		groups: &Groups,
 		statement: SignedStatement,
 		origin: StatementOrigin,
-	) -> Result<bool, ValidatorUnknown> {
+	) -> Result<bool, Error> {
 		let validator_index = statement.validator_index();
 		let validator_meta = match self.validator_meta.get_mut(&validator_index) {
-			None => return Err(ValidatorUnknown),
+			None => return Err(Error::ValidatorUnknown),
 			Some(m) => m,
 		};
 
@@ -134,7 +134,7 @@ impl StatementStore {
 						"groups passed into `insert` differ from those used at store creation"
 					);
 
-					return Err(ValidatorUnknown)
+					return Err(Error::ValidatorUnknown)
 				},
 			};
 
@@ -251,9 +251,12 @@ impl StatementStore {
 	}
 }
 
-/// Error indicating that the validator was unknown.
+/// Error when inserting a statement into the statement store.
 #[derive(Debug)]
-pub struct ValidatorUnknown;
+pub enum Error {
+	/// The validator was unknown.
+	ValidatorUnknown,
+}
 
 type Fingerprint = (ValidatorIndex, CompactStatement);
 
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs
index a9f5b537b3238..7ffed9d47d4bd 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs
@@ -75,15 +75,7 @@ fn share_seconded_circulated_to_cluster() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		let full_signed = state
 			.sign_statement(
@@ -120,7 +112,7 @@ fn share_seconded_circulated_to_cluster() {
 
 		// sharing a `Seconded` message confirms a candidate, which leads to new
 		// fragment tree updates.
-		answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 
 		overseer
 	});
@@ -156,15 +148,7 @@ fn cluster_valid_statement_before_seconded_ignored() {
 		.await;
 
 		send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		let signed_valid = state.sign_statement(
 			v_a,
@@ -226,15 +210,7 @@ fn cluster_statement_bad_signature() {
 		.await;
 
 		send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// sign statements with wrong signing context, leading to bad signature.
 		let statements = vec![
@@ -308,15 +284,7 @@ fn useful_cluster_statement_from_non_cluster_peer_rejected() {
 		.await;
 
 		send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		let statement = state
 			.sign_statement(
@@ -370,15 +338,7 @@ fn statement_from_non_cluster_originator_unexpected() {
 		connect_peer(&mut overseer, peer_a.clone(), None).await;
 
 		send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		let statement = state
 			.sign_statement(
@@ -448,15 +408,7 @@ fn seconded_statement_leads_to_request() {
 		.await;
 
 		send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		let statement = state
 			.sign_statement(
@@ -497,7 +449,7 @@ fn seconded_statement_leads_to_request() {
 				if p == peer_a && r == BENEFIT_VALID_RESPONSE.into() => { }
 		);
 
-		answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 
 		overseer
 	});
@@ -544,15 +496,7 @@ fn cluster_statements_shared_seconded_first() {
 			.await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		let full_signed = state
 			.sign_statement(
@@ -579,7 +523,7 @@ fn cluster_statements_shared_seconded_first() {
 			.await;
 
 		// result of new confirmed candidate.
-		answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 
 		overseer
 			.send(FromOrchestra::Communication {
@@ -677,15 +621,7 @@ fn cluster_accounts_for_implicit_view() {
 			send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		let full_signed = state
 			.sign_statement(
@@ -722,7 +658,7 @@ fn cluster_accounts_for_implicit_view() {
 
 		// sharing a `Seconded` message confirms a candidate, which leads to new
 		// fragment tree updates.
-		answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 
 		// activate new leaf, which has relay-parent in implicit view.
 		let next_relay_parent = Hash::repeat_byte(2);
@@ -730,15 +666,7 @@ fn cluster_accounts_for_implicit_view() {
 		next_test_leaf.parent_hash = relay_parent;
 		next_test_leaf.number = 2;
 
-		activate_leaf(&mut overseer, &next_test_leaf, &state, false).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(next_relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &next_test_leaf, &state, false, vec![]).await;
 
 		send_peer_view_change(&mut overseer, peer_a.clone(), view![next_relay_parent]).await;
 		send_peer_view_change(&mut overseer, peer_b.clone(), view![next_relay_parent]).await;
@@ -820,15 +748,7 @@ fn cluster_messages_imported_after_confirmed_candidate_importable_check() {
 			send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Peer sends `Seconded` statement.
 		{
@@ -885,8 +805,6 @@ fn cluster_messages_imported_after_confirmed_candidate_importable_check() {
 				},
 				vec![(relay_parent, vec![0])],
 			)],
-			None,
-			false,
 		)
 		.await;
 
@@ -953,15 +871,7 @@ fn cluster_messages_imported_after_new_leaf_importable_check() {
 			send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Peer sends `Seconded` statement.
 		{
@@ -1008,17 +918,18 @@ fn cluster_messages_imported_after_new_leaf_importable_check() {
 			);
 		}
 
-		answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 
 		let next_relay_parent = Hash::repeat_byte(2);
 		let mut next_test_leaf = state.make_dummy_leaf(next_relay_parent);
 		next_test_leaf.parent_hash = relay_parent;
 		next_test_leaf.number = 2;
 
-		activate_leaf(&mut overseer, &next_test_leaf, &state, false).await;
-
-		answer_expected_hypothetical_depth_request(
+		activate_leaf(
 			&mut overseer,
+			&next_test_leaf,
+			&state,
+			false,
 			vec![(
 				HypotheticalCandidate::Complete {
 					candidate_hash,
@@ -1027,8 +938,6 @@ fn cluster_messages_imported_after_new_leaf_importable_check() {
 				},
 				vec![(relay_parent, vec![0])],
 			)],
-			Some(next_relay_parent),
-			false,
 		)
 		.await;
 
@@ -1117,15 +1026,7 @@ fn ensure_seconding_limit_is_respected() {
 			send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Confirm the candidates locally so that we don't send out requests.
 
@@ -1152,7 +1053,7 @@ fn ensure_seconding_limit_is_respected() {
 				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Candidate 2.
@@ -1178,7 +1079,7 @@ fn ensure_seconding_limit_is_respected() {
 				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Send first statement from peer A.
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs
index aa1a473b833f4..1ac9f4d45e714 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs
@@ -102,15 +102,7 @@ fn backed_candidate_leads_to_advertisement() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
@@ -137,7 +129,7 @@ fn backed_candidate_leads_to_advertisement() {
 				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Send enough statements to make candidate backable, make sure announcements are sent.
@@ -232,7 +224,7 @@ fn backed_candidate_leads_to_advertisement() {
 				}
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		overseer
@@ -320,15 +312,7 @@ fn received_advertisement_before_confirmation_leads_to_request() {
 			send_peer_view_change(&mut overseer, peer_d.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
@@ -400,7 +384,7 @@ fn received_advertisement_before_confirmation_leads_to_request() {
 					if p == peer_c && r == BENEFIT_VALID_RESPONSE.into() => { }
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		overseer
@@ -530,7 +514,7 @@ fn received_advertisement_after_backing_leads_to_acknowledgement() {
 			assert_peer_reported!(&mut overseer, peer_c, BENEFIT_VALID_STATEMENT);
 			assert_peer_reported!(&mut overseer, peer_c, BENEFIT_VALID_RESPONSE);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Receive Backed message.
@@ -561,7 +545,7 @@ fn received_advertisement_after_backing_leads_to_acknowledgement() {
 				}
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Receive a manifest about the same candidate from peer D.
@@ -733,7 +717,7 @@ fn received_acknowledgements_for_locally_confirmed() {
 				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Receive an unexpected acknowledgement from peer D.
@@ -798,7 +782,7 @@ fn received_acknowledgements_for_locally_confirmed() {
 				}
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Receive an unexpected acknowledgement from peer D.
@@ -930,7 +914,7 @@ fn received_acknowledgements_for_externally_confirmed() {
 			assert_peer_reported!(&mut overseer, peer_c, BENEFIT_VALID_STATEMENT);
 			assert_peer_reported!(&mut overseer, peer_c, BENEFIT_VALID_RESPONSE);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		let ack = BackedCandidateAcknowledgement {
@@ -1022,15 +1006,7 @@ fn received_advertisement_after_confirmation_before_backing() {
 			send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
@@ -1121,7 +1097,7 @@ fn received_advertisement_after_confirmation_before_backing() {
 					if p == peer_c && r == BENEFIT_VALID_RESPONSE.into()
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Receive advertisement from peer D (after confirmation but before backing).
@@ -1208,15 +1184,7 @@ fn additional_statements_are_shared_after_manifest_exchange() {
 			send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
@@ -1301,13 +1269,8 @@ fn additional_statements_are_shared_after_manifest_exchange() {
 			persisted_validation_data: pvd.clone(),
 		};
 		let membership = vec![(relay_parent, vec![0])];
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![(hypothetical, membership)],
-			None,
-			false,
-		)
-		.await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![(hypothetical, membership)])
+			.await;
 
 		// Statements are sent to the Backing subsystem.
 		{
@@ -1371,7 +1334,7 @@ fn additional_statements_are_shared_after_manifest_exchange() {
 				}
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Receive a manifest about the same candidate from peer D. Contains different statements.
@@ -1514,17 +1477,8 @@ fn advertisement_sent_when_peer_enters_relay_parent_view() {
 			send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
-
-		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
 
 		// Confirm the candidate locally so that we don't send out requests.
@@ -1549,7 +1503,7 @@ fn advertisement_sent_when_peer_enters_relay_parent_view() {
 				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Send enough statements to make candidate backable, make sure announcements are sent.
@@ -1616,7 +1570,7 @@ fn advertisement_sent_when_peer_enters_relay_parent_view() {
 			})
 			.await;
 
-		answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 
 		// Relay parent enters view of peer C.
 		{
@@ -1737,17 +1691,8 @@ fn advertisement_not_re_sent_when_peer_re_enters_view() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
-		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
 
 		// Confirm the candidate locally so that we don't send out requests.
@@ -1772,7 +1717,7 @@ fn advertisement_not_re_sent_when_peer_re_enters_view() {
 				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Send enough statements to make candidate backable, make sure announcements are sent.
@@ -1867,7 +1812,7 @@ fn advertisement_not_re_sent_when_peer_re_enters_view() {
 				}
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Peer leaves view.
@@ -1949,17 +1894,8 @@ fn grid_statements_imported_to_backing() {
 			send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
-
-		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
 
 		// Receive an advertisement from C.
@@ -2042,13 +1978,8 @@ fn grid_statements_imported_to_backing() {
 			persisted_validation_data: pvd.clone(),
 		};
 		let membership = vec![(relay_parent, vec![0])];
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![(hypothetical, membership)],
-			None,
-			false,
-		)
-		.await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![(hypothetical, membership)])
+			.await;
 
 		// Receive messages from Backing subsystem.
 		{
@@ -2165,17 +2096,8 @@ fn advertisements_rejected_from_incorrect_peers() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
-
-		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
 
 		let manifest = BackedCandidateManifest {
@@ -2289,17 +2211,8 @@ fn manifest_rejected_with_unknown_relay_parent() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
-		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
 
 		let manifest = BackedCandidateManifest {
@@ -2391,17 +2304,8 @@ fn manifest_rejected_when_not_a_validator() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
-
-		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
 
 		let manifest = BackedCandidateManifest {
@@ -2498,17 +2402,8 @@ fn manifest_rejected_when_group_does_not_match_para() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
-
-		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
 
 		let manifest = BackedCandidateManifest {
@@ -2613,17 +2508,8 @@ fn peer_reported_for_advertisement_conflicting_with_confirmed_candidate() {
 			send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
-		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
 
 		let manifest = BackedCandidateManifest {
@@ -2713,7 +2599,7 @@ fn peer_reported_for_advertisement_conflicting_with_confirmed_candidate() {
 					if p == peer_c && r == BENEFIT_VALID_RESPONSE.into()
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Receive conflicting advertisement from peer C after confirmation.
@@ -2755,7 +2641,6 @@ fn inactive_local_participates_in_grid() {
 		async_backing_params: None,
 	};
 
-	let dummy_relay_parent = Hash::repeat_byte(2);
 	let relay_parent = Hash::repeat_byte(1);
 	let peer_a = PeerId::random();
 
@@ -2795,25 +2680,10 @@ fn inactive_local_participates_in_grid() {
 			send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &dummy_leaf, &state, true).await;
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(dummy_relay_parent),
-			false,
-		)
-		.await;
-
+		activate_leaf(&mut overseer, &dummy_leaf, &state, true, vec![]).await;
 		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
-		activate_leaf(&mut overseer, &test_leaf, &state, false).await;
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, false, vec![]).await;
 
 		// Receive an advertisement from A.
 		let manifest = BackedCandidateManifest {
@@ -2876,7 +2746,7 @@ fn inactive_local_participates_in_grid() {
 			AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
 				if p == peer_a && r == BENEFIT_VALID_RESPONSE.into() => { }
 		);
-		answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 
 		overseer
 	});
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs
index c34cf20d716ca..3ce43202b9542 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs
@@ -187,12 +187,30 @@ impl TestState {
 					collator: None,
 				})
 			}),
+			disabled_validators: Default::default(),
 			para_data: (0..self.session_info.validator_groups.len())
 				.map(|i| (ParaId::from(i as u32), PerParaData::new(1, vec![1, 2, 3].into())))
 				.collect(),
+			minimum_backing_votes: 2,
 		}
 	}
 
+	fn make_dummy_leaf_with_disabled_validators(
+		&self,
+		relay_parent: Hash,
+		disabled_validators: Vec<ValidatorIndex>,
+	) -> TestLeaf {
+		TestLeaf { disabled_validators, ..self.make_dummy_leaf(relay_parent) }
+	}
+
+	fn make_dummy_leaf_with_min_backing_votes(
+		&self,
+		relay_parent: Hash,
+		minimum_backing_votes: u32,
+	) -> TestLeaf {
+		TestLeaf { minimum_backing_votes, ..self.make_dummy_leaf(relay_parent) }
+	}
+
 	fn make_availability_cores(&self, f: impl Fn(usize) -> CoreState) -> Vec<CoreState> {
 		(0..self.session_info.validator_groups.len()).map(f).collect()
 	}
@@ -240,6 +258,19 @@ impl TestState {
 			.collect()
 	}
 
+	fn index_within_group(
+		&self,
+		group_index: GroupIndex,
+		validator_index: ValidatorIndex,
+	) -> Option<usize> {
+		self.session_info
+			.validator_groups
+			.get(group_index)
+			.unwrap()
+			.iter()
+			.position(|&i| i == validator_index)
+	}
+
 	fn discovery_id(&self, validator_index: ValidatorIndex) -> AuthorityDiscoveryId {
 		self.session_info.discovery_keys[validator_index.0 as usize].clone()
 	}
@@ -284,7 +315,7 @@ impl TestState {
 		&mut self,
 		peer: PeerId,
 		request: AttestedCandidateRequest,
-	) -> impl Future<Output = sc_network::config::OutgoingResponse> {
+	) -> impl Future<Output = Option<sc_network::config::OutgoingResponse>> {
 		let (tx, rx) = futures::channel::oneshot::channel();
 		let req = sc_network::config::IncomingRequest {
 			peer,
@@ -293,7 +324,7 @@ impl TestState {
 		};
 		self.req_sender.send(req).await.unwrap();
 
-		rx.map(|r| r.unwrap())
+		rx.map(|r| r.ok())
 	}
 }
 
@@ -366,7 +397,9 @@ struct TestLeaf {
 	parent_hash: Hash,
 	session: SessionIndex,
 	availability_cores: Vec<CoreState>,
+	disabled_validators: Vec<ValidatorIndex>,
 	para_data: Vec<(ParaId, PerParaData)>,
+	minimum_backing_votes: u32,
 }
 
 impl TestLeaf {
@@ -447,9 +480,7 @@ async fn setup_test_and_connect_peers(
 		}
 	}
 
-	activate_leaf(overseer, &test_leaf, &state, true).await;
-
-	answer_expected_hypothetical_depth_request(overseer, vec![], Some(relay_parent), false).await;
+	activate_leaf(overseer, &test_leaf, &state, true, vec![]).await;
 
 	// Send gossip topology.
 	send_new_topology(overseer, state.make_dummy_topology()).await;
@@ -472,6 +503,7 @@ async fn activate_leaf(
 	leaf: &TestLeaf,
 	test_state: &TestState,
 	is_new_session: bool,
+	hypothetical_frontier: Vec<(HypotheticalCandidate, FragmentTreeMembership)>,
 ) {
 	let activated = new_leaf(leaf.hash, leaf.number);
 
@@ -481,7 +513,14 @@ async fn activate_leaf(
 		))))
 		.await;
 
-	handle_leaf_activation(virtual_overseer, leaf, test_state, is_new_session).await;
+	handle_leaf_activation(
+		virtual_overseer,
+		leaf,
+		test_state,
+		is_new_session,
+		hypothetical_frontier,
+	)
+	.await;
 }
 
 async fn handle_leaf_activation(
@@ -489,8 +528,18 @@ async fn handle_leaf_activation(
 	leaf: &TestLeaf,
 	test_state: &TestState,
 	is_new_session: bool,
+	hypothetical_frontier: Vec<(HypotheticalCandidate, FragmentTreeMembership)>,
 ) {
-	let TestLeaf { number, hash, parent_hash, para_data, session, availability_cores } = leaf;
+	let TestLeaf {
+		number,
+		hash,
+		parent_hash,
+		para_data,
+		session,
+		availability_cores,
+		disabled_validators,
+		minimum_backing_votes,
+	} = leaf;
 
 	assert_matches!(
 		virtual_overseer.recv().await,
@@ -530,51 +579,82 @@ async fn handle_leaf_activation(
 		}
 	);
 
-	assert_matches!(
-		virtual_overseer.recv().await,
-		AllMessages::RuntimeApi(
-			RuntimeApiMessage::Request(parent, RuntimeApiRequest::SessionIndexForChild(tx))) if parent == *hash => {
-			tx.send(Ok(*session)).unwrap();
-		}
-	);
-
-	assert_matches!(
-		virtual_overseer.recv().await,
-		AllMessages::RuntimeApi(
-			RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx))) if parent == *hash => {
-			tx.send(Ok(availability_cores.clone())).unwrap();
-		}
-	);
-
-	let validator_groups = test_state.session_info.validator_groups.to_vec();
-	let group_rotation_info =
-		GroupRotationInfo { session_start_block: 1, group_rotation_frequency: 12, now: 1 };
-	assert_matches!(
-		virtual_overseer.recv().await,
-		AllMessages::RuntimeApi(
-			RuntimeApiMessage::Request(parent, RuntimeApiRequest::ValidatorGroups(tx))) if parent == *hash => {
-			tx.send(Ok((validator_groups, group_rotation_info))).unwrap();
-		}
-	);
-
-	if is_new_session {
-		assert_matches!(
-			virtual_overseer.recv().await,
-			AllMessages::RuntimeApi(
-				RuntimeApiMessage::Request(parent, RuntimeApiRequest::SessionInfo(s, tx))) if parent == *hash && s == *session => {
+	loop {
+		match virtual_overseer.recv().await {
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				_parent,
+				RuntimeApiRequest::Version(tx),
+			)) => {
+				tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)).unwrap();
+			},
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				parent,
+				RuntimeApiRequest::DisabledValidators(tx),
+			)) if parent == *hash => {
+				tx.send(Ok(disabled_validators.clone())).unwrap();
+			},
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				_parent,
+				RuntimeApiRequest::DisabledValidators(tx),
+			)) => {
+				tx.send(Ok(Vec::new())).unwrap();
+			},
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				_parent, // assume all active leaves are in the same session
+				RuntimeApiRequest::SessionIndexForChild(tx),
+			)) => {
+				tx.send(Ok(*session)).unwrap();
+			},
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				parent,
+				RuntimeApiRequest::SessionInfo(s, tx),
+			)) if parent == *hash && s == *session => {
+				assert!(is_new_session, "only expecting this call in a new session");
 				tx.send(Ok(Some(test_state.session_info.clone()))).unwrap();
-			}
-		);
-
-		assert_matches!(
-			virtual_overseer.recv().await,
+			},
 			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
 				parent,
 				RuntimeApiRequest::MinimumBackingVotes(session_index, tx),
 			)) if parent == *hash && session_index == *session => {
-				tx.send(Ok(2)).unwrap();
-			}
-		);
+				assert!(is_new_session, "only expecting this call in a new session");
+				tx.send(Ok(*minimum_backing_votes)).unwrap();
+			},
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				parent,
+				RuntimeApiRequest::AvailabilityCores(tx),
+			)) if parent == *hash => {
+				tx.send(Ok(availability_cores.clone())).unwrap();
+			},
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				parent,
+				RuntimeApiRequest::ValidatorGroups(tx),
+			)) if parent == *hash => {
+				let validator_groups = test_state.session_info.validator_groups.to_vec();
+				let group_rotation_info = GroupRotationInfo {
+					session_start_block: 1,
+					group_rotation_frequency: 12,
+					now: 1,
+				};
+				tx.send(Ok((validator_groups, group_rotation_info))).unwrap();
+			},
+			AllMessages::ProspectiveParachains(
+				ProspectiveParachainsMessage::GetHypotheticalFrontier(req, tx),
+			) => {
+				assert_eq!(req.fragment_tree_relay_parent, Some(*hash));
+				assert!(!req.backed_in_path_only);
+				for (i, (candidate, _)) in hypothetical_frontier.iter().enumerate() {
+					assert!(
+						req.candidates.iter().any(|c| &c == &candidate),
+						"did not receive request for hypothetical candidate {}",
+						i,
+					);
+				}
+				tx.send(hypothetical_frontier).unwrap();
+				// this is the last expected runtime api call
+				break
+			},
+			msg => panic!("unexpected runtime API call: {msg:?}"),
+		}
 	}
 }
 
@@ -614,16 +694,14 @@ async fn handle_sent_request(
 async fn answer_expected_hypothetical_depth_request(
 	virtual_overseer: &mut VirtualOverseer,
 	responses: Vec<(HypotheticalCandidate, FragmentTreeMembership)>,
-	expected_leaf_hash: Option<Hash>,
-	expected_backed_in_path_only: bool,
 ) {
 	assert_matches!(
 		virtual_overseer.recv().await,
 		AllMessages::ProspectiveParachains(
 			ProspectiveParachainsMessage::GetHypotheticalFrontier(req, tx)
 		) => {
-			assert_eq!(req.fragment_tree_relay_parent, expected_leaf_hash);
-			assert_eq!(req.backed_in_path_only, expected_backed_in_path_only);
+			assert_eq!(req.fragment_tree_relay_parent, None);
+			assert!(!req.backed_in_path_only);
 			for (i, (candidate, _)) in responses.iter().enumerate() {
 				assert!(
 					req.candidates.iter().any(|c| &c == &candidate),
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
index 1eec8290fabae..04934b31482ed 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
@@ -86,15 +86,7 @@ fn cluster_peer_allowed_to_send_incomplete_statements() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Peer in cluster sends a statement, triggering a request.
 		{
@@ -176,7 +168,7 @@ fn cluster_peer_allowed_to_send_incomplete_statements() {
 			);
 		}
 
-		answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 
 		overseer
 	});
@@ -272,15 +264,7 @@ fn peer_reported_for_providing_statements_meant_to_be_masked_out() {
 			send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
@@ -354,7 +338,7 @@ fn peer_reported_for_providing_statements_meant_to_be_masked_out() {
 					if p == peer_c && r == BENEFIT_VALID_RESPONSE.into()
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Peer C advertises candidate 2.
@@ -426,7 +410,7 @@ fn peer_reported_for_providing_statements_meant_to_be_masked_out() {
 					if p == peer_c && r == BENEFIT_VALID_RESPONSE.into()
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Peer C sends an announcement for candidate 3. Should hit seconding limit for validator 1.
@@ -537,15 +521,7 @@ fn peer_reported_for_not_enough_statements() {
 			send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
@@ -657,7 +633,7 @@ fn peer_reported_for_not_enough_statements() {
 					if p == peer_c && r == BENEFIT_VALID_RESPONSE.into()
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		overseer
@@ -725,15 +701,7 @@ fn peer_reported_for_duplicate_statements() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Peer in cluster sends a statement, triggering a request.
 		{
@@ -820,7 +788,7 @@ fn peer_reported_for_duplicate_statements() {
 			);
 		}
 
-		answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 
 		overseer
 	});
@@ -887,15 +855,7 @@ fn peer_reported_for_providing_statements_with_invalid_signatures() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Peer in cluster sends a statement, triggering a request.
 		{
@@ -958,7 +918,7 @@ fn peer_reported_for_providing_statements_with_invalid_signatures() {
 					if p == peer_a && r == BENEFIT_VALID_RESPONSE.into() => { }
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		overseer
@@ -1026,15 +986,7 @@ fn peer_reported_for_providing_statements_with_wrong_validator_id() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Peer in cluster sends a statement, triggering a request.
 		{
@@ -1096,7 +1048,7 @@ fn peer_reported_for_providing_statements_with_wrong_validator_id() {
 					if p == peer_a && r == BENEFIT_VALID_RESPONSE.into() => { }
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		overseer
@@ -1104,26 +1056,31 @@ fn peer_reported_for_providing_statements_with_wrong_validator_id() {
 }
 
 #[test]
-fn local_node_sanity_checks_incoming_requests() {
+fn disabled_validators_added_to_unwanted_mask() {
+	let group_size = 3;
 	let config = TestConfig {
 		validator_count: 20,
-		group_size: 3,
+		group_size,
 		local_validator: LocalRole::Validator,
 		async_backing_params: None,
 	};
 
 	let relay_parent = Hash::repeat_byte(1);
-	let peer_a = PeerId::random();
+	let peer_disabled = PeerId::random();
 	let peer_b = PeerId::random();
-	let peer_c = PeerId::random();
-	let peer_d = PeerId::random();
 
-	test_harness(config, |mut state, mut overseer| async move {
+	test_harness(config, |state, mut overseer| async move {
 		let local_validator = state.local.clone().unwrap();
 		let local_group_index = local_validator.group_index.unwrap();
 		let local_para = ParaId::from(local_group_index.0);
+		let other_group_validators = state.group_validators(local_group_index, true);
+		let index_disabled = other_group_validators[0];
+		let index_within_group = state.index_within_group(local_group_index, index_disabled);
+		let index_b = other_group_validators[1];
 
-		let test_leaf = state.make_dummy_leaf(relay_parent);
+		let disabled_validators = vec![index_disabled];
+		let test_leaf =
+			state.make_dummy_leaf_with_disabled_validators(relay_parent, disabled_validators);
 
 		let (candidate, pvd) = make_candidate(
 			relay_parent,
@@ -1135,200 +1092,164 @@ fn local_node_sanity_checks_incoming_requests() {
 		);
 		let candidate_hash = candidate.hash();
 
-		// peer A is in group, has relay parent in view.
-		// peer B is in group, has no relay parent in view.
-		// peer C is not in group, has relay parent in view.
+		// peer A is in group, has relay parent in view and disabled.
+		// peer B is in group, has relay parent in view.
 		{
-			let other_group_validators = state.group_validators(local_group_index, true);
-
 			connect_peer(
 				&mut overseer,
-				peer_a.clone(),
-				Some(vec![state.discovery_id(other_group_validators[0])].into_iter().collect()),
+				peer_disabled.clone(),
+				Some(vec![state.discovery_id(index_disabled)].into_iter().collect()),
 			)
 			.await;
-
 			connect_peer(
 				&mut overseer,
 				peer_b.clone(),
-				Some(vec![state.discovery_id(other_group_validators[1])].into_iter().collect()),
+				Some(vec![state.discovery_id(index_b)].into_iter().collect()),
 			)
 			.await;
-
-			connect_peer(&mut overseer, peer_c.clone(), None).await;
-
-			send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
-			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
+			send_peer_view_change(&mut overseer, peer_disabled.clone(), view![relay_parent]).await;
+			send_peer_view_change(&mut overseer, peer_b.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
-		let mask = StatementFilter::blank(state.config.group_size);
+		let seconded_disabled = state
+			.sign_statement(
+				index_disabled,
+				CompactStatement::Seconded(candidate_hash),
+				&SigningContext { parent_hash: relay_parent, session_index: 1 },
+			)
+			.as_unchecked()
+			.clone();
 
-		// Should drop requests for unknown candidates.
+		let seconded_b = state
+			.sign_statement(
+				index_b,
+				CompactStatement::Seconded(candidate_hash),
+				&SigningContext { parent_hash: relay_parent, session_index: 1 },
+			)
+			.as_unchecked()
+			.clone();
 		{
-			let (pending_response, rx) = oneshot::channel();
-			state
-				.req_sender
-				.send(RawIncomingRequest {
-					// Request from peer that received manifest.
-					peer: peer_c,
-					payload: request_v2::AttestedCandidateRequest {
-						candidate_hash: candidate.hash(),
-						mask: mask.clone(),
-					}
-					.encode(),
-					pending_response,
-				})
-				.await
-				.unwrap();
+			send_peer_message(
+				&mut overseer,
+				peer_disabled.clone(),
+				protocol_v2::StatementDistributionMessage::Statement(
+					relay_parent,
+					seconded_disabled.clone(),
+				),
+			)
+			.await;
 
-			assert_matches!(rx.await, Err(oneshot::Canceled));
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_disabled && r == COST_DISABLED_VALIDATOR.into() => { }
+			);
 		}
 
-		// Confirm candidate.
 		{
-			let full_signed = state
-				.sign_statement(
-					local_validator.validator_index,
-					CompactStatement::Seconded(candidate_hash),
-					&SigningContext { session_index: 1, parent_hash: relay_parent },
-				)
-				.convert_to_superpayload(StatementWithPVD::Seconded(candidate.clone(), pvd.clone()))
-				.unwrap();
-
-			overseer
-				.send(FromOrchestra::Communication {
-					msg: StatementDistributionMessage::Share(relay_parent, full_signed),
-				})
-				.await;
+			send_peer_message(
+				&mut overseer,
+				peer_b.clone(),
+				protocol_v2::StatementDistributionMessage::Statement(
+					relay_parent,
+					seconded_b.clone(),
+				),
+			)
+			.await;
 
 			assert_matches!(
 				overseer.recv().await,
-				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
-					peers,
-					Versioned::V2(protocol_v2::ValidationProtocol::StatementDistribution(
-						protocol_v2::StatementDistributionMessage::Statement(
-							r,
-							s,
-						)
-					))
-				)) => {
-					assert_eq!(peers, vec![peer_a.clone()]);
-					assert_eq!(r, relay_parent);
-					assert_eq!(s.unchecked_payload(), &CompactStatement::Seconded(candidate_hash));
-					assert_eq!(s.unchecked_validator_index(), local_validator.validator_index);
-				}
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_b && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { }
 			);
-
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
 		}
 
-		// Should drop requests from unknown peers.
+		// Send a request to peer and mock its response with a statement from disabled validator.
 		{
-			let (pending_response, rx) = oneshot::channel();
-			state
-				.req_sender
-				.send(RawIncomingRequest {
-					// Request from peer that received manifest.
-					peer: peer_d,
-					payload: request_v2::AttestedCandidateRequest {
-						candidate_hash: candidate.hash(),
-						mask: mask.clone(),
-					}
-					.encode(),
-					pending_response,
-				})
-				.await
-				.unwrap();
-
-			assert_matches!(rx.await, Err(oneshot::Canceled));
-		}
+			let statements = vec![seconded_disabled];
+			let mut mask = StatementFilter::blank(group_size);
+			let i = index_within_group.unwrap();
+			mask.seconded_in_group.set(i, true);
+			mask.validated_in_group.set(i, true);
 
-		// Should drop requests with bitfields of the wrong size.
-		{
-			let mask = StatementFilter::blank(state.config.group_size + 1);
-			let response = state
-				.send_request(
-					peer_c,
-					request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask },
-				)
-				.await
-				.await;
+			handle_sent_request(
+				&mut overseer,
+				peer_b,
+				candidate_hash,
+				mask,
+				candidate.clone(),
+				pvd.clone(),
+				statements,
+			)
+			.await;
 
 			assert_matches!(
-				response,
-				RawOutgoingResponse {
-					result,
-					reputation_changes,
-					sent_feedback
-				} => {
-					assert_matches!(result, Err(()));
-					assert_eq!(reputation_changes, vec![COST_INVALID_REQUEST_BITFIELD_SIZE.into()]);
-					assert_matches!(sent_feedback, None);
-				}
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_b && r == COST_UNREQUESTED_RESPONSE_STATEMENT.into() => { }
 			);
-		}
 
-		// Local node should reject requests if we did not send a manifest to that peer.
-		{
-			let response = state
-				.send_request(
-					peer_c,
-					request_v2::AttestedCandidateRequest {
-						candidate_hash: candidate.hash(),
-						mask: mask.clone(),
-					},
-				)
-				.await
-				.await;
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_b && r == BENEFIT_VALID_RESPONSE.into() => { }
+			);
 
-			// Should get `COST_UNEXPECTED_REQUEST` response.
 			assert_matches!(
-				response,
-				RawOutgoingResponse {
-					result,
-					reputation_changes,
-					sent_feedback
-				} => {
-					assert_matches!(result, Err(()));
-					assert_eq!(reputation_changes, vec![COST_UNEXPECTED_REQUEST.into()]);
-					assert_matches!(sent_feedback, None);
+				overseer.recv().await,
+				AllMessages:: NetworkBridgeTx(
+					NetworkBridgeTxMessage::SendValidationMessage(
+						peers,
+						Versioned::V2(
+							protocol_v2::ValidationProtocol::StatementDistribution(
+								protocol_v2::StatementDistributionMessage::Statement(hash, statement),
+							),
+						),
+					)
+				) => {
+					assert_eq!(peers, vec![peer_disabled]);
+					assert_eq!(hash, relay_parent);
+					assert_eq!(statement, seconded_b);
 				}
 			);
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		overseer
 	});
 }
 
+// We send a request to a peer and after receiving the response
+// we learn about a validator being disabled. We should filter out
+// the statement from the disabled validator when receiving it.
 #[test]
-fn local_node_checks_that_peer_can_request_before_responding() {
+fn when_validator_disabled_after_sending_the_request() {
+	let group_size = 3;
 	let config = TestConfig {
 		validator_count: 20,
-		group_size: 3,
+		group_size,
 		local_validator: LocalRole::Validator,
 		async_backing_params: None,
 	};
 
 	let relay_parent = Hash::repeat_byte(1);
-	let peer_a = PeerId::random();
+	let another_relay_parent = Hash::repeat_byte(2);
+	let peer_disabled_later = PeerId::random();
 	let peer_b = PeerId::random();
 
-	test_harness(config, |mut state, mut overseer| async move {
+	test_harness(config, |state, mut overseer| async move {
 		let local_validator = state.local.clone().unwrap();
 		let local_group_index = local_validator.group_index.unwrap();
 		let local_para = ParaId::from(local_group_index.0);
+		let other_group_validators = state.group_validators(local_group_index, true);
+		let index_disabled = other_group_validators[0];
+		let index_b = other_group_validators[1];
 
-		let test_leaf = state.make_dummy_leaf(relay_parent);
+		let test_leaf = state.make_dummy_leaf_with_disabled_validators(relay_parent, vec![]);
+		let test_leaf_disabled = state
+			.make_dummy_leaf_with_disabled_validators(another_relay_parent, vec![index_disabled]);
 
 		let (candidate, pvd) = make_candidate(
 			relay_parent,
@@ -1340,20 +1261,733 @@ fn local_node_checks_that_peer_can_request_before_responding() {
 		);
 		let candidate_hash = candidate.hash();
 
-		// Peers A and B are in group and have relay parent in view.
-		let other_group_validators = state.group_validators(local_group_index, true);
+		// peer A is in group, has relay parent in view and disabled later.
+		// peer B is in group, has relay parent in view.
+		{
+			connect_peer(
+				&mut overseer,
+				peer_disabled_later.clone(),
+				Some(vec![state.discovery_id(index_disabled)].into_iter().collect()),
+			)
+			.await;
+			connect_peer(
+				&mut overseer,
+				peer_b.clone(),
+				Some(vec![state.discovery_id(index_b)].into_iter().collect()),
+			)
+			.await;
+			send_peer_view_change(&mut overseer, peer_disabled_later.clone(), view![relay_parent])
+				.await;
+			send_peer_view_change(&mut overseer, peer_b.clone(), view![relay_parent]).await;
+		}
 
-		connect_peer(
-			&mut overseer,
-			peer_a.clone(),
-			Some(vec![state.discovery_id(other_group_validators[0])].into_iter().collect()),
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
-		connect_peer(
-			&mut overseer,
-			peer_b.clone(),
-			Some(vec![state.discovery_id(other_group_validators[1])].into_iter().collect()),
+		let seconded_disabled = state
+			.sign_statement(
+				index_disabled,
+				CompactStatement::Seconded(candidate_hash),
+				&SigningContext { parent_hash: relay_parent, session_index: 1 },
+			)
+			.as_unchecked()
+			.clone();
+
+		let seconded_b = state
+			.sign_statement(
+				index_b,
+				CompactStatement::Seconded(candidate_hash),
+				&SigningContext { parent_hash: relay_parent, session_index: 1 },
+			)
+			.as_unchecked()
+			.clone();
+		{
+			send_peer_message(
+				&mut overseer,
+				peer_b.clone(),
+				protocol_v2::StatementDistributionMessage::Statement(
+					relay_parent,
+					seconded_b.clone(),
+				),
+			)
+			.await;
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_b && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { }
+			);
+		}
+
+		// Send a request to peer and activate leaf when a validator is disabled;
+		// mock the response with a statement from disabled validator.
+		{
+			let statements = vec![seconded_disabled];
+			let mask = StatementFilter::blank(group_size);
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(mut requests, IfDisconnected::ImmediateError)) => {
+					assert_eq!(requests.len(), 1);
+					assert_matches!(
+						requests.pop().unwrap(),
+						Requests::AttestedCandidateV2(outgoing) => {
+							assert_eq!(outgoing.peer, Recipient::Peer(peer_b));
+							assert_eq!(outgoing.payload.candidate_hash, candidate_hash);
+							assert_eq!(outgoing.payload.mask, mask);
+
+							activate_leaf(&mut overseer, &test_leaf_disabled, &state, false, vec![]).await;
+
+							let res = AttestedCandidateResponse {
+								candidate_receipt: candidate,
+								persisted_validation_data: pvd,
+								statements,
+							};
+							outgoing.pending_response.send(Ok(res.encode())).unwrap();
+						}
+					);
+				}
+			);
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_b && r == BENEFIT_VALID_RESPONSE.into() => { }
+			);
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages:: NetworkBridgeTx(
+					NetworkBridgeTxMessage::SendValidationMessage(
+						peers,
+						Versioned::V2(
+							protocol_v2::ValidationProtocol::StatementDistribution(
+								protocol_v2::StatementDistributionMessage::Statement(hash, statement),
+							),
+						),
+					)
+				) => {
+					assert_eq!(peers, vec![peer_disabled_later]);
+					assert_eq!(hash, relay_parent);
+					assert_eq!(statement, seconded_b);
+				}
+			);
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
+		}
+
+		overseer
+	});
+}
+
+#[test]
+fn no_response_for_grid_request_not_meeting_quorum() {
+	let validator_count = 6;
+	let group_size = 3;
+	let config = TestConfig {
+		validator_count,
+		group_size,
+		local_validator: LocalRole::Validator,
+		async_backing_params: None,
+	};
+
+	let relay_parent = Hash::repeat_byte(1);
+	let peer_a = PeerId::random();
+	let peer_b = PeerId::random();
+	let peer_c = PeerId::random();
+
+	test_harness(config, |mut state, mut overseer| async move {
+		let local_validator = state.local.clone().unwrap();
+		let local_group_index = local_validator.group_index.unwrap();
+		let local_para = ParaId::from(local_group_index.0);
+
+		let test_leaf = state.make_dummy_leaf_with_min_backing_votes(relay_parent, 2);
+
+		let (candidate, pvd) = make_candidate(
+			relay_parent,
+			1,
+			local_para,
+			test_leaf.para_data(local_para).head_data.clone(),
+			vec![4, 5, 6].into(),
+			Hash::repeat_byte(42).into(),
+		);
+		let candidate_hash = candidate.hash();
+
+		let other_group_validators = state.group_validators(local_group_index, true);
+		let target_group_validators =
+			state.group_validators((local_group_index.0 + 1).into(), true);
+		let v_a = other_group_validators[0];
+		let v_b = other_group_validators[1];
+		let v_c = target_group_validators[0];
+
+		// peer A is in group, has relay parent in view.
+		// peer B is in group, has no relay parent in view.
+		// peer C is not in group, has relay parent in view.
+		{
+			connect_peer(
+				&mut overseer,
+				peer_a.clone(),
+				Some(vec![state.discovery_id(v_a)].into_iter().collect()),
+			)
+			.await;
+
+			connect_peer(
+				&mut overseer,
+				peer_b.clone(),
+				Some(vec![state.discovery_id(v_b)].into_iter().collect()),
+			)
+			.await;
+
+			connect_peer(
+				&mut overseer,
+				peer_c.clone(),
+				Some(vec![state.discovery_id(v_c)].into_iter().collect()),
+			)
+			.await;
+
+			send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
+			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
+		}
+
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
+
+		// Send gossip topology.
+		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
+
+		// Confirm the candidate locally so that we don't send out requests.
+		{
+			let statement = state
+				.sign_full_statement(
+					local_validator.validator_index,
+					Statement::Seconded(candidate.clone()),
+					&SigningContext { parent_hash: relay_parent, session_index: 1 },
+					pvd.clone(),
+				)
+				.clone();
+
+			overseer
+				.send(FromOrchestra::Communication {
+					msg: StatementDistributionMessage::Share(relay_parent, statement),
+				})
+				.await;
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
+			);
+
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
+		}
+
+		// Send enough statements to make candidate backable, make sure announcements are sent.
+
+		// Send statement from peer A.
+		{
+			let statement = state
+				.sign_statement(
+					v_a,
+					CompactStatement::Seconded(candidate_hash),
+					&SigningContext { parent_hash: relay_parent, session_index: 1 },
+				)
+				.as_unchecked()
+				.clone();
+
+			send_peer_message(
+				&mut overseer,
+				peer_a.clone(),
+				protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement),
+			)
+			.await;
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { }
+			);
+		}
+
+		// Send statement from peer B.
+		let statement_b = state
+			.sign_statement(
+				v_b,
+				CompactStatement::Seconded(candidate_hash),
+				&SigningContext { parent_hash: relay_parent, session_index: 1 },
+			)
+			.as_unchecked()
+			.clone();
+		{
+			send_peer_message(
+				&mut overseer,
+				peer_b.clone(),
+				protocol_v2::StatementDistributionMessage::Statement(
+					relay_parent,
+					statement_b.clone(),
+				),
+			)
+			.await;
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_b && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { }
+			);
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
+			);
+		}
+
+		// Send Backed notification.
+		{
+			overseer
+				.send(FromOrchestra::Communication {
+					msg: StatementDistributionMessage::Backed(candidate_hash),
+				})
+				.await;
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages:: NetworkBridgeTx(
+					NetworkBridgeTxMessage::SendValidationMessage(
+						peers,
+						Versioned::V2(
+							protocol_v2::ValidationProtocol::StatementDistribution(
+								protocol_v2::StatementDistributionMessage::BackedCandidateManifest(manifest),
+							),
+						),
+					)
+				) => {
+					assert_eq!(peers, vec![peer_c]);
+					assert_eq!(manifest, BackedCandidateManifest {
+						relay_parent,
+						candidate_hash,
+						group_index: local_validator.group_index.unwrap(),
+						para_id: local_para,
+						parent_head_data_hash: pvd.parent_head.hash(),
+						statement_knowledge: StatementFilter {
+							seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 1],
+							validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
+						},
+					});
+				}
+			);
+
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
+		}
+
+		let mask = StatementFilter {
+			seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1],
+			validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
+		};
+
+		let relay_2 = Hash::repeat_byte(2);
+		let disabled_validators = vec![v_a];
+		let leaf_2 = state.make_dummy_leaf_with_disabled_validators(relay_2, disabled_validators);
+		activate_leaf(&mut overseer, &leaf_2, &state, false, vec![]).await;
+
+		// Incoming request to local node. Local node should not send the response as v_a is
+		// disabled and hence the quorum is not reached.
+		{
+			let response = state
+				.send_request(
+					peer_c,
+					request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask },
+				)
+				.await
+				.await;
+
+			assert!(
+				response.is_none(),
+				"We should not send a response as the quorum is not reached yet"
+			);
+		}
+
+		overseer
+	});
+}
+
+#[test]
+fn disabling_works_from_the_latest_state_not_relay_parent() {
+	let group_size = 3;
+	let config = TestConfig {
+		validator_count: 20,
+		group_size,
+		local_validator: LocalRole::Validator,
+		async_backing_params: None,
+	};
+
+	let relay_1 = Hash::repeat_byte(1);
+	let relay_2 = Hash::repeat_byte(2);
+	let peer_disabled = PeerId::random();
+
+	test_harness(config, |state, mut overseer| async move {
+		let local_validator = state.local.clone().unwrap();
+		let local_group_index = local_validator.group_index.unwrap();
+		let local_para = ParaId::from(local_group_index.0);
+
+		let other_group_validators = state.group_validators(local_group_index, true);
+		let index_disabled = other_group_validators[0];
+
+		let leaf_1 = state.make_dummy_leaf(relay_1);
+		let disabled_validators = vec![index_disabled];
+		let leaf_2 = state.make_dummy_leaf_with_disabled_validators(relay_2, disabled_validators);
+
+		let (candidate_1, pvd_1) = make_candidate(
+			relay_1,
+			1,
+			local_para,
+			leaf_1.para_data(local_para).head_data.clone(),
+			vec![4, 5, 6].into(),
+			Hash::repeat_byte(42).into(),
+		);
+		let candidate_1_hash = candidate_1.hash();
+
+		let (candidate_2, _) = make_candidate(
+			relay_1,
+			1,
+			local_para,
+			leaf_1.para_data(local_para).head_data.clone(),
+			vec![4, 5, 6, 7].into(),
+			Hash::repeat_byte(42).into(),
+		);
+		let candidate_2_hash = candidate_2.hash();
+
+		{
+			connect_peer(
+				&mut overseer,
+				peer_disabled.clone(),
+				Some(vec![state.discovery_id(index_disabled)].into_iter().collect()),
+			)
+			.await;
+			send_peer_view_change(&mut overseer, peer_disabled.clone(), view![relay_1]).await;
+		}
+
+		activate_leaf(&mut overseer, &leaf_1, &state, true, vec![]).await;
+
+		let seconded_1 = state
+			.sign_statement(
+				index_disabled,
+				CompactStatement::Seconded(candidate_1_hash),
+				&SigningContext { parent_hash: relay_1, session_index: 1 },
+			)
+			.as_unchecked()
+			.clone();
+
+		let seconded_2 = state
+			.sign_statement(
+				index_disabled,
+				CompactStatement::Seconded(candidate_2_hash),
+				&SigningContext { parent_hash: relay_1, session_index: 1 },
+			)
+			.as_unchecked()
+			.clone();
+		{
+			send_peer_message(
+				&mut overseer,
+				peer_disabled.clone(),
+				protocol_v2::StatementDistributionMessage::Statement(relay_1, seconded_1.clone()),
+			)
+			.await;
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+				if p == peer_disabled && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { }
+			);
+		}
+
+		{
+			handle_sent_request(
+				&mut overseer,
+				peer_disabled,
+				candidate_1_hash,
+				StatementFilter::blank(group_size),
+				candidate_1.clone(),
+				pvd_1.clone(),
+				vec![seconded_1.clone()],
+			)
+			.await;
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_disabled && r == BENEFIT_VALID_STATEMENT.into() => { }
+			);
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_disabled && r == BENEFIT_VALID_RESPONSE.into() => { }
+			);
+
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
+		}
+
+		activate_leaf(&mut overseer, &leaf_2, &state, false, vec![]).await;
+
+		{
+			send_peer_message(
+				&mut overseer,
+				peer_disabled.clone(),
+				protocol_v2::StatementDistributionMessage::Statement(relay_1, seconded_2.clone()),
+			)
+			.await;
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_disabled && r == COST_DISABLED_VALIDATOR.into() => { }
+			);
+		}
+
+		overseer
+	});
+}
+
+#[test]
+fn local_node_sanity_checks_incoming_requests() {
+	let config = TestConfig {
+		validator_count: 20,
+		group_size: 3,
+		local_validator: LocalRole::Validator,
+		async_backing_params: None,
+	};
+
+	let relay_parent = Hash::repeat_byte(1);
+	let peer_a = PeerId::random();
+	let peer_b = PeerId::random();
+	let peer_c = PeerId::random();
+	let peer_d = PeerId::random();
+
+	test_harness(config, |mut state, mut overseer| async move {
+		let local_validator = state.local.clone().unwrap();
+		let local_group_index = local_validator.group_index.unwrap();
+		let local_para = ParaId::from(local_group_index.0);
+
+		let test_leaf = state.make_dummy_leaf(relay_parent);
+
+		let (candidate, pvd) = make_candidate(
+			relay_parent,
+			1,
+			local_para,
+			test_leaf.para_data(local_para).head_data.clone(),
+			vec![4, 5, 6].into(),
+			Hash::repeat_byte(42).into(),
+		);
+		let candidate_hash = candidate.hash();
+
+		// peer A is in group, has relay parent in view.
+		// peer B is in group, has no relay parent in view.
+		// peer C is not in group, has relay parent in view.
+		{
+			let other_group_validators = state.group_validators(local_group_index, true);
+
+			connect_peer(
+				&mut overseer,
+				peer_a.clone(),
+				Some(vec![state.discovery_id(other_group_validators[0])].into_iter().collect()),
+			)
+			.await;
+
+			connect_peer(
+				&mut overseer,
+				peer_b.clone(),
+				Some(vec![state.discovery_id(other_group_validators[1])].into_iter().collect()),
+			)
+			.await;
+
+			connect_peer(&mut overseer, peer_c.clone(), None).await;
+
+			send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
+			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
+		}
+
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
+
+		let mask = StatementFilter::blank(state.config.group_size);
+
+		// Should drop requests for unknown candidates.
+		{
+			let (pending_response, rx) = oneshot::channel();
+			state
+				.req_sender
+				.send(RawIncomingRequest {
+					// Request from peer that received manifest.
+					peer: peer_c,
+					payload: request_v2::AttestedCandidateRequest {
+						candidate_hash: candidate.hash(),
+						mask: mask.clone(),
+					}
+					.encode(),
+					pending_response,
+				})
+				.await
+				.unwrap();
+
+			assert_matches!(rx.await, Err(oneshot::Canceled));
+		}
+
+		// Confirm candidate.
+		{
+			let full_signed = state
+				.sign_statement(
+					local_validator.validator_index,
+					CompactStatement::Seconded(candidate_hash),
+					&SigningContext { session_index: 1, parent_hash: relay_parent },
+				)
+				.convert_to_superpayload(StatementWithPVD::Seconded(candidate.clone(), pvd.clone()))
+				.unwrap();
+
+			overseer
+				.send(FromOrchestra::Communication {
+					msg: StatementDistributionMessage::Share(relay_parent, full_signed),
+				})
+				.await;
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
+					peers,
+					Versioned::V2(protocol_v2::ValidationProtocol::StatementDistribution(
+						protocol_v2::StatementDistributionMessage::Statement(
+							r,
+							s,
+						)
+					))
+				)) => {
+					assert_eq!(peers, vec![peer_a.clone()]);
+					assert_eq!(r, relay_parent);
+					assert_eq!(s.unchecked_payload(), &CompactStatement::Seconded(candidate_hash));
+					assert_eq!(s.unchecked_validator_index(), local_validator.validator_index);
+				}
+			);
+
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
+		}
+
+		// Should drop requests from unknown peers.
+		{
+			let (pending_response, rx) = oneshot::channel();
+			state
+				.req_sender
+				.send(RawIncomingRequest {
+					// Request from peer that received manifest.
+					peer: peer_d,
+					payload: request_v2::AttestedCandidateRequest {
+						candidate_hash: candidate.hash(),
+						mask: mask.clone(),
+					}
+					.encode(),
+					pending_response,
+				})
+				.await
+				.unwrap();
+
+			assert_matches!(rx.await, Err(oneshot::Canceled));
+		}
+
+		// Should drop requests with bitfields of the wrong size.
+		{
+			let mask = StatementFilter::blank(state.config.group_size + 1);
+			let response = state
+				.send_request(
+					peer_c,
+					request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask },
+				)
+				.await
+				.await
+				.unwrap();
+
+			assert_matches!(
+				response,
+				RawOutgoingResponse {
+					result,
+					reputation_changes,
+					sent_feedback
+				} => {
+					assert_matches!(result, Err(()));
+					assert_eq!(reputation_changes, vec![COST_INVALID_REQUEST_BITFIELD_SIZE.into()]);
+					assert_matches!(sent_feedback, None);
+				}
+			);
+		}
+
+		// Local node should reject requests if we did not send a manifest to that peer.
+		{
+			let response = state
+				.send_request(
+					peer_c,
+					request_v2::AttestedCandidateRequest {
+						candidate_hash: candidate.hash(),
+						mask: mask.clone(),
+					},
+				)
+				.await
+				.await
+				.unwrap();
+
+			// Should get `COST_UNEXPECTED_REQUEST` response.
+			assert_matches!(
+				response,
+				RawOutgoingResponse {
+					result,
+					reputation_changes,
+					sent_feedback
+				} => {
+					assert_matches!(result, Err(()));
+					assert_eq!(reputation_changes, vec![COST_UNEXPECTED_REQUEST.into()]);
+					assert_matches!(sent_feedback, None);
+				}
+			);
+		}
+
+		overseer
+	});
+}
+
+#[test]
+fn local_node_checks_that_peer_can_request_before_responding() {
+	let config = TestConfig {
+		validator_count: 20,
+		group_size: 3,
+		local_validator: LocalRole::Validator,
+		async_backing_params: None,
+	};
+
+	let relay_parent = Hash::repeat_byte(1);
+	let peer_a = PeerId::random();
+	let peer_b = PeerId::random();
+
+	test_harness(config, |mut state, mut overseer| async move {
+		let local_validator = state.local.clone().unwrap();
+		let local_group_index = local_validator.group_index.unwrap();
+		let local_para = ParaId::from(local_group_index.0);
+
+		let test_leaf = state.make_dummy_leaf(relay_parent);
+
+		let (candidate, pvd) = make_candidate(
+			relay_parent,
+			1,
+			local_para,
+			test_leaf.para_data(local_para).head_data.clone(),
+			vec![4, 5, 6].into(),
+			Hash::repeat_byte(42).into(),
+		);
+		let candidate_hash = candidate.hash();
+
+		// Peers A and B are in group and have relay parent in view.
+		let other_group_validators = state.group_validators(local_group_index, true);
+
+		connect_peer(
+			&mut overseer,
+			peer_a.clone(),
+			Some(vec![state.discovery_id(other_group_validators[0])].into_iter().collect()),
+		)
+		.await;
+
+		connect_peer(
+			&mut overseer,
+			peer_b.clone(),
+			Some(vec![state.discovery_id(other_group_validators[1])].into_iter().collect()),
 		)
 		.await;
 		let peer_b_index = other_group_validators[1];
@@ -1362,15 +1996,7 @@ fn local_node_checks_that_peer_can_request_before_responding() {
 		send_peer_view_change(&mut overseer, peer_b.clone(), view![relay_parent]).await;
 
 		// Finish setup
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		let mask = StatementFilter::blank(state.config.group_size);
 
@@ -1409,7 +2035,7 @@ fn local_node_checks_that_peer_can_request_before_responding() {
 			}
 		);
 
-		answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+		answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 
 		// Local node should respond to requests from peers in the same group
 		// which appear to not have already seen the candidate
@@ -1424,7 +2050,8 @@ fn local_node_checks_that_peer_can_request_before_responding() {
 					},
 				)
 				.await
-				.await;
+				.await
+				.unwrap();
 
 			let expected_statements = vec![signed.into_unchecked()];
 			assert_matches!(response, full_response => {
@@ -1473,7 +2100,8 @@ fn local_node_checks_that_peer_can_request_before_responding() {
 					},
 				)
 				.await
-				.await;
+				.await
+				.unwrap();
 
 			// Peer already knows about this candidate. Should reject.
 			assert_matches!(
@@ -1535,7 +2163,7 @@ fn local_node_respects_statement_mask() {
 		let local_group_index = local_validator.group_index.unwrap();
 		let local_para = ParaId::from(local_group_index.0);
 
-		let test_leaf = state.make_dummy_leaf(relay_parent);
+		let test_leaf = state.make_dummy_leaf_with_min_backing_votes(relay_parent, 2);
 
 		let (candidate, pvd) = make_candidate(
 			relay_parent,
@@ -1592,15 +2220,7 @@ fn local_node_respects_statement_mask() {
 			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
@@ -1627,26 +2247,28 @@ fn local_node_respects_statement_mask() {
 				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Send enough statements to make candidate backable, make sure announcements are sent.
 
 		// Send statement from peer A.
+		let statement_a = state
+			.sign_statement(
+				v_a,
+				CompactStatement::Seconded(candidate_hash),
+				&SigningContext { parent_hash: relay_parent, session_index: 1 },
+			)
+			.as_unchecked()
+			.clone();
 		{
-			let statement = state
-				.sign_statement(
-					v_a,
-					CompactStatement::Seconded(candidate_hash),
-					&SigningContext { parent_hash: relay_parent, session_index: 1 },
-				)
-				.as_unchecked()
-				.clone();
-
 			send_peer_message(
 				&mut overseer,
 				peer_a.clone(),
-				protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement),
+				protocol_v2::StatementDistributionMessage::Statement(
+					relay_parent,
+					statement_a.clone(),
+				),
 			)
 			.await;
 
@@ -1724,12 +2346,12 @@ fn local_node_respects_statement_mask() {
 				}
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// `1` indicates statements NOT to request.
 		let mask = StatementFilter {
-			seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
+			seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1],
 			validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
 		};
 
@@ -1741,9 +2363,10 @@ fn local_node_respects_statement_mask() {
 					request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask },
 				)
 				.await
-				.await;
+				.await
+				.unwrap();
 
-			let expected_statements = vec![statement_b];
+			let expected_statements = vec![statement_a, statement_b];
 			assert_matches!(response, full_response => {
 				// Response is the same for v2.
 				let request_v2::AttestedCandidateResponse { candidate_receipt, persisted_validation_data, statements } =
@@ -1837,15 +2460,7 @@ fn should_delay_before_retrying_dropped_requests() {
 			send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await;
 		}
 
-		activate_leaf(&mut overseer, &test_leaf, &state, true).await;
-
-		answer_expected_hypothetical_depth_request(
-			&mut overseer,
-			vec![],
-			Some(relay_parent),
-			false,
-		)
-		.await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
 
 		// Send gossip topology.
 		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
@@ -1984,7 +2599,7 @@ fn should_delay_before_retrying_dropped_requests() {
 					if p == peer_c && r == BENEFIT_VALID_RESPONSE.into()
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		// Sleep for the given amount of time. This should reset the delay for the first candidate.
@@ -2051,7 +2666,7 @@ fn should_delay_before_retrying_dropped_requests() {
 					if p == peer_c && r == BENEFIT_VALID_RESPONSE.into()
 			);
 
-			answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await;
+			answer_expected_hypothetical_depth_request(&mut overseer, vec![]).await;
 		}
 
 		overseer
diff --git a/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md b/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md
index 86a1bf1214134..e6e597c531787 100644
--- a/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md
+++ b/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md
@@ -123,6 +123,31 @@ only send "importable" statements to the backing subsystem itself.
   backable and part of the hypothetical frontier.
 - Note that requesting is not an implicit acknowledgement, and an explicit acknowledgement must be sent upon receipt.
 
+### Disabled validators
+
+After a validator is disabled in the runtime, other validators should no longer
+accept statements from it. Filtering out of statements from disabled validators
+on the node side is purely an optimization, as it will be done in the runtime
+as well.
+
+Because we use the state of the active leaves to
+check whether a validator is disabled instead of the relay parent, the notion
+of being disabled is inherently racy:
+- the responder has learned about the disabled validator before the requester
+- the receiver has witnessed the disabled validator after sending the request
+
+We could have sent a manifest to a peer, then received information about
+disabling, and then receive a request. This can break an invariant of the grid
+mode:
+- the response is required to indicate quorum
+
+Due to the above, there should be no response at all for grid requests when
+the backing threshold is no longer met as a result of disabled validators.
+In addition to that, we add disabled validators to the request's unwanted
+mask. This ensures that the sender will not send statements from disabled
+validators (at least from the perspective of the receiver at the moment of the
+request). This doesn't fully avoid race conditions, but tries to minimize them.
+
 ## Messages
 
 ### Incoming
diff --git a/polkadot/zombienet_tests/functional/0010-validator-disabling.toml b/polkadot/zombienet_tests/functional/0010-validator-disabling.toml
new file mode 100644
index 0000000000000..6701d60d74d14
--- /dev/null
+++ b/polkadot/zombienet_tests/functional/0010-validator-disabling.toml
@@ -0,0 +1,39 @@
+[settings]
+timeout = 1000
+bootnode = true
+
+[relaychain.genesis.runtimeGenesis.patch.configuration.config]
+  max_validators_per_core = 1
+  needed_approvals = 2
+  group_rotation_frequency = 10
+
+[relaychain]
+default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}"
+chain = "westend-local" # for the disabling to take an effect
+default_command = "polkadot"
+
+[relaychain.default_resources]
+limits = { memory = "4G", cpu = "2" }
+requests = { memory = "2G", cpu = "1" }
+
+  [[relaychain.node_groups]]
+  name = "honest-validator"
+  count = 3
+  args = ["-lparachain=debug"]
+
+  [[relaychain.node_groups]]
+  image = "{{MALUS_IMAGE}}"
+  name = "malus-validator"
+  command = "malus suggest-garbage-candidate"
+  args = ["-lMALUS=trace"]
+  count = 1
+
+[[parachains]]
+id = 1000
+cumulus_based = true
+
+  [parachains.collator]
+  name = "alice"
+  command = "polkadot-parachain"
+  image = "{{CUMULUS_IMAGE}}"
+  args = ["-lparachain=debug"]
diff --git a/polkadot/zombienet_tests/functional/0010-validator-disabling.zndsl b/polkadot/zombienet_tests/functional/0010-validator-disabling.zndsl
new file mode 100644
index 0000000000000..c810266102061
--- /dev/null
+++ b/polkadot/zombienet_tests/functional/0010-validator-disabling.zndsl
@@ -0,0 +1,21 @@
+Description: Test validator disabling effects
+Network: ./0010-validator-disabling.toml
+Creds: config
+
+# Ensure nodes are up and running
+honest-validator: reports node_roles is 4
+
+# Ensure parachain is registered
+honest-validator: parachain 1000 is registered within 100 seconds
+
+# Ensure parachain made progress
+honest-validator: parachain 1000 block height is at least 1 within 300 seconds
+
+# Wait for the dispute
+honest-validator-1: reports parachain_candidate_disputes_total is at least 1 within 600 seconds
+
+# Disputes should conclude
+honest-validator: reports polkadot_parachain_candidate_dispute_concluded{validity="invalid"} is at least 1 within 200 seconds
+
+# Wait for a few blocks for the disabling to take place.
+honest-validator: log line contains "Disabled validators detected" within 180 seconds
diff --git a/prdoc/pr_1841.prdoc b/prdoc/pr_1841.prdoc
new file mode 100644
index 0000000000000..c99583e6dc309
--- /dev/null
+++ b/prdoc/pr_1841.prdoc
@@ -0,0 +1,18 @@
+title: Validator disabling in Statement Distribution.
+
+doc:
+  - audience: Node Operator
+    description: |
+      Once a validator has been disabled for misbehavior, other validators
+      should no longer gossip its backing statements in the current era.
+      If they do, it might result in disconnects from the network due to low
+      reputation.
+
+migrations:
+  db: []
+  runtime: []
+
+crates:
+  - name: polkadot-statement-distribution
+
+host_functions: []