Skip to content

Commit

Permalink
perf: Use MutableIntMap in SystemState (#3304)
Browse files Browse the repository at this point in the history
Use `MutableIntMap` instead of `BTreeMap` for all maps and priority
queues under `SystemState` with integer-like keys, making them virtually
free to clone (something we do a lot during execution and certification)
at the cost of up to 2x slower lookup and iteration, up to 3x slower
insert and up to 5x slower remove.

Also add a `MutableIntMap` tracking the count of callbacks per call
context, to avoid `CallContextManager::outstanding_calls` iterating over
the full list of callbacks every time.
  • Loading branch information
alin-at-dfinity authored Jan 10, 2025
1 parent df21455 commit 69981dc
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 124 deletions.
41 changes: 26 additions & 15 deletions rs/replicated_state/src/canister_state/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use self::message_pool::{
Context, InboundReference, Kind, MessagePool, OutboundReference, SomeReference,
};
use self::queue::{CanisterQueue, IngressQueue, InputQueue, OutputQueue};
use crate::page_map::int_map::MutableIntMap;
use crate::replicated_state::MR_SYNTHETIC_REJECT_MESSAGE_MAX_LEN;
use crate::{CanisterState, CheckpointLoadingMetrics, InputQueueType, InputSource, StateError};
use ic_base_types::PrincipalId;
Expand Down Expand Up @@ -164,7 +165,7 @@ pub struct CanisterQueues {
///
/// Used for response deduplication (whether due to a locally generated reject
/// response to a best-effort call; or due to a malicious / buggy subnet).
callbacks_with_enqueued_response: BTreeSet<CallbackId>,
callbacks_with_enqueued_response: MutableIntMap<CallbackId, ()>,
}

/// Circular iterator that consumes output queue messages: loops over output
Expand Down Expand Up @@ -364,13 +365,13 @@ struct MessageStoreImpl {
/// `CanisterInput::DeadlineExpired` by `peek_input()` / `pop_input()` (and
/// "inflated" by `SystemState` into `SysUnknown` reject responses based on the
/// callback).
expired_callbacks: BTreeMap<InboundReference, CallbackId>,
expired_callbacks: MutableIntMap<InboundReference, CallbackId>,

/// Compact reject responses (`CallbackIds`) replacing best-effort responses
/// that were shed. These are returned as `CanisterInput::ResponseDropped` by
/// `peek_input()` / `pop_input()` (and "inflated" by `SystemState` into
/// `SysUnknown` reject responses based on the callback).
shed_responses: BTreeMap<InboundReference, CallbackId>,
shed_responses: MutableIntMap<InboundReference, CallbackId>,
}

impl MessageStoreImpl {
Expand Down Expand Up @@ -554,7 +555,7 @@ trait InboundMessageStore: MessageStore<CanisterInput> {
fn callbacks_with_enqueued_response(
&self,
canister_queues: &BTreeMap<CanisterId, (Arc<InputQueue>, Arc<OutputQueue>)>,
) -> Result<BTreeSet<CallbackId>, String>;
) -> Result<MutableIntMap<CallbackId, ()>, String>;
}

impl InboundMessageStore for MessageStoreImpl {
Expand All @@ -567,8 +568,8 @@ impl InboundMessageStore for MessageStoreImpl {
fn callbacks_with_enqueued_response(
&self,
canister_queues: &BTreeMap<CanisterId, (Arc<InputQueue>, Arc<OutputQueue>)>,
) -> Result<BTreeSet<CallbackId>, String> {
let mut callbacks = BTreeSet::new();
) -> Result<MutableIntMap<CallbackId, ()>, String> {
let mut callbacks = MutableIntMap::new();
canister_queues
.values()
.flat_map(|(input_queue, _)| input_queue.iter())
Expand Down Expand Up @@ -603,7 +604,7 @@ impl InboundMessageStore for MessageStoreImpl {
}
};

if callbacks.insert(callback_id) {
if callbacks.insert(callback_id, ()).is_none() {
Ok(())
} else {
Err(format!(
Expand Down Expand Up @@ -758,9 +759,10 @@ impl CanisterQueues {
match self.canister_queues.get_mut(&sender) {
Some((queue, _)) if queue.check_has_reserved_response_slot().is_ok() => {
// Check against duplicate responses.
if !self
if self
.callbacks_with_enqueued_response
.insert(response.originator_reply_callback)
.insert(response.originator_reply_callback, ())
.is_some()
{
debug_assert_eq!(Ok(()), self.test_invariants());
if response.deadline == NO_DEADLINE {
Expand Down Expand Up @@ -796,7 +798,8 @@ impl CanisterQueues {
// aleady checked for a matching callback). Silently drop it.
debug_assert!(self
.callbacks_with_enqueued_response
.contains(&response.originator_reply_callback));
.get(&response.originator_reply_callback)
.is_some());
return Ok(false);
}
}
Expand Down Expand Up @@ -853,7 +856,11 @@ impl CanisterQueues {
};

// Check against duplicate responses.
if !self.callbacks_with_enqueued_response.insert(callback_id) {
if self
.callbacks_with_enqueued_response
.insert(callback_id, ())
.is_some()
{
// There is already a response enqueued for the callback.
return Ok(false);
}
Expand Down Expand Up @@ -920,7 +927,10 @@ impl CanisterQueues {

if let Some(msg_) = &msg {
if let Some(callback_id) = msg_.response_callback_id() {
assert!(self.callbacks_with_enqueued_response.remove(&callback_id));
assert!(self
.callbacks_with_enqueued_response
.remove(&callback_id)
.is_some());
}
debug_assert_eq!(Ok(()), self.test_invariants());
debug_assert_eq!(Ok(()), self.schedules_ok(&|_| InputQueueType::RemoteSubnet));
Expand Down Expand Up @@ -1559,7 +1569,8 @@ impl CanisterQueues {
// request that was still in an output queue.
assert!(self
.callbacks_with_enqueued_response
.insert(response.originator_reply_callback));
.insert(response.originator_reply_callback, ())
.is_none());
let reference = self.store.insert_inbound(response.into());
Arc::make_mut(input_queue).push_response(reference);

Expand Down Expand Up @@ -1742,7 +1753,7 @@ fn input_queue_type_fn<'a>(
impl From<&CanisterQueues> for pb_queues::CanisterQueues {
fn from(item: &CanisterQueues) -> Self {
fn callback_references_to_proto(
callback_references: &BTreeMap<message_pool::InboundReference, CallbackId>,
callback_references: &MutableIntMap<message_pool::InboundReference, CallbackId>,
) -> Vec<pb_queues::canister_queues::CallbackReference> {
callback_references
.iter()
Expand Down Expand Up @@ -1791,7 +1802,7 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can

fn callback_references_try_from_proto(
callback_references: Vec<pb_queues::canister_queues::CallbackReference>,
) -> Result<BTreeMap<message_pool::InboundReference, CallbackId>, ProxyDecodeError>
) -> Result<MutableIntMap<message_pool::InboundReference, CallbackId>, ProxyDecodeError>
{
callback_references
.into_iter()
Expand Down
91 changes: 66 additions & 25 deletions rs/replicated_state/src/canister_state/queues/message_pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::CanisterInput;
use crate::page_map::int_map::{AsInt, MutableIntMap};
use ic_protobuf::proxy::{try_from_option_field, ProxyDecodeError};
use ic_protobuf::state::queues::v1 as pb_queues;
use ic_types::messages::{
Expand All @@ -8,7 +9,7 @@ use ic_types::time::CoarseTime;
use ic_types::{CountBytes, Time};
use ic_validate_eq::ValidateEq;
use ic_validate_eq_derive::ValidateEq;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeSet;
use std::marker::PhantomData;
use std::ops::{AddAssign, SubAssign};
use std::sync::Arc;
Expand Down Expand Up @@ -131,6 +132,33 @@ impl Id {
}
}

impl AsInt for Id {
type Repr = u64;

#[inline]
fn as_int(&self) -> u64 {
self.0
}
}

impl AsInt for (CoarseTime, Id) {
type Repr = u128;

#[inline]
fn as_int(&self) -> u128 {
(self.0.as_secs_since_unix_epoch() as u128) << 64 | self.1 .0 as u128
}
}

impl AsInt for (usize, Id) {
type Repr = u128;

#[inline]
fn as_int(&self) -> u128 {
(self.0 as u128) << 64 | self.1 .0 as u128
}
}

/// A typed reference -- inbound (`CanisterInput`) or outbound
/// (`RequestOrResponse`) -- to a message in the `MessagePool`.
#[derive(Debug)]
Expand Down Expand Up @@ -214,6 +242,15 @@ impl<T> From<Reference<T>> for Id {
}
}

impl<T> AsInt for Reference<T> {
type Repr = u64;

#[inline]
fn as_int(&self) -> u64 {
self.0
}
}

/// A reference to an inbound message (returned as a `CanisterInput`).
pub(super) type InboundReference = Reference<CanisterInput>;

Expand Down Expand Up @@ -327,7 +364,7 @@ impl TryFrom<pb_queues::canister_queues::CallbackReference> for CallbackReferenc
pub(super) struct MessagePool {
/// Pool contents.
#[validate_eq(CompareWithValidateEq)]
messages: BTreeMap<Id, RequestOrResponse>,
messages: MutableIntMap<Id, RequestOrResponse>,

/// Records the (implicit) deadlines of all the outbound guaranteed response
/// requests (only).
Expand All @@ -337,7 +374,7 @@ pub(super) struct MessagePool {
/// `outbound_guaranteed_request_deadlines.keys().collect() == messages.keys().filter(|id| (id.context(), id.class(), id.kind()) == (Context::Outbound, Class::GuaranteedResponse, Kind::Request)).collect()`
/// * The deadline matches the one recorded in `deadline_queue`:
/// `outbound_guaranteed_request_deadlines.iter().all(|(id, deadline)| deadline_queue.contains(&(deadline, id)))`
outbound_guaranteed_request_deadlines: BTreeMap<Id, CoarseTime>,
outbound_guaranteed_request_deadlines: MutableIntMap<Id, CoarseTime>,

/// Running message stats for the pool.
message_stats: MessageStats,
Expand All @@ -348,13 +385,13 @@ pub(super) struct MessagePool {
/// by deadline.
///
/// Message IDs break ties, ensuring deterministic ordering.
deadline_queue: BTreeSet<(CoarseTime, Id)>,
deadline_queue: MutableIntMap<(CoarseTime, Id), ()>,

/// Load shedding priority queue. Holds all best-effort messages, ordered by
/// size.
///
/// Message IDs break ties, ensuring deterministic ordering.
size_queue: BTreeSet<(usize, Id)>,
size_queue: MutableIntMap<(usize, Id), ()>,

/// A monotonically increasing counter used to generate unique message IDs.
message_id_generator: u64,
Expand Down Expand Up @@ -470,7 +507,7 @@ impl MessagePool {
// all best-effort messages except responses in input queues; plus guaranteed
// response requests in output queues
if actual_deadline != NO_DEADLINE {
self.deadline_queue.insert((actual_deadline, id));
self.deadline_queue.insert((actual_deadline, id), ());

// Record in the outbound guaranteed response deadline map, iff it's an outbound
// guaranteed response request.
Expand All @@ -483,7 +520,7 @@ impl MessagePool {

// Record in load shedding queue iff it's a best-effort message.
if class == Class::BestEffort {
self.size_queue.insert((size_bytes, id));
self.size_queue.insert((size_bytes, id), ());
}

reference
Expand Down Expand Up @@ -552,7 +589,7 @@ impl MessagePool {
.outbound_guaranteed_request_deadlines
.remove(&id)
.unwrap();
let removed = self.deadline_queue.remove(&(deadline, id));
let removed = self.deadline_queue.remove(&(deadline, id)).is_some();
debug_assert!(removed);
}

Expand All @@ -564,7 +601,7 @@ impl MessagePool {

// All other best-effort messages do expire.
(_, BestEffort, _) => {
let removed = self.deadline_queue.remove(&(msg.deadline(), id));
let removed = self.deadline_queue.remove(&(msg.deadline(), id)).is_some();
debug_assert!(removed);
}
}
Expand All @@ -573,7 +610,7 @@ impl MessagePool {
/// Removes the given message from the load shedding queue.
fn remove_from_size_queue(&mut self, id: Id, msg: &RequestOrResponse) {
if id.class() == Class::BestEffort {
let removed = self.size_queue.remove(&(msg.count_bytes(), id));
let removed = self.size_queue.remove(&(msg.count_bytes(), id)).is_some();
debug_assert!(removed);
}
}
Expand All @@ -582,7 +619,7 @@ impl MessagePool {
///
/// Time complexity: `O(log(self.len()))`.
pub(super) fn has_expired_deadlines(&self, now: Time) -> bool {
if let Some((deadline, _)) = self.deadline_queue.first() {
if let Some((deadline, _)) = self.deadline_queue.min_key() {
let now = CoarseTime::floor(now);
if *deadline < now {
return true;
Expand All @@ -602,7 +639,7 @@ impl MessagePool {
}

let now = CoarseTime::floor(now);
if self.deadline_queue.first().unwrap().0 >= now {
if self.deadline_queue.min_key().unwrap().0 >= now {
// No expired messages, bail out.
return Vec::new();
}
Expand All @@ -614,7 +651,7 @@ impl MessagePool {
// Take and return all expired messages.
let expired = temp
.into_iter()
.map(|(_, id)| {
.map(|((_, id), _)| {
let msg = self.take_impl(id).unwrap();
if id.is_outbound_guaranteed_request() {
self.outbound_guaranteed_request_deadlines.remove(&id);
Expand All @@ -633,7 +670,8 @@ impl MessagePool {
///
/// Time complexity: `O(log(self.len()))`.
pub(super) fn shed_largest_message(&mut self) -> Option<(SomeReference, RequestOrResponse)> {
if let Some((_, id)) = self.size_queue.pop_last() {
if let Some(&(size_bytes, id)) = self.size_queue.max_key() {
self.size_queue.remove(&(size_bytes, id)).unwrap();
debug_assert_eq!(Class::BestEffort, id.class());

let msg = self.take_impl(id).unwrap();
Expand Down Expand Up @@ -661,7 +699,7 @@ impl MessagePool {
/// `debug_assert!()` checks.
///
/// Time complexity: `O(n)`.
fn calculate_message_stats(messages: &BTreeMap<Id, RequestOrResponse>) -> MessageStats {
fn calculate_message_stats(messages: &MutableIntMap<Id, RequestOrResponse>) -> MessageStats {
let mut stats = MessageStats::default();
for (id, msg) in messages.iter() {
stats += MessageStats::stats_delta(msg, id.context());
Expand Down Expand Up @@ -754,11 +792,14 @@ impl MessagePool {
/// Time complexity: `O(n * log(n))`.
#[allow(clippy::type_complexity)]
fn calculate_priority_queues(
messages: &BTreeMap<Id, RequestOrResponse>,
outbound_guaranteed_request_deadlines: &BTreeMap<Id, CoarseTime>,
) -> (BTreeSet<(CoarseTime, Id)>, BTreeSet<(usize, Id)>) {
let mut expected_deadline_queue = BTreeSet::new();
let mut expected_size_queue = BTreeSet::new();
messages: &MutableIntMap<Id, RequestOrResponse>,
outbound_guaranteed_request_deadlines: &MutableIntMap<Id, CoarseTime>,
) -> (
MutableIntMap<(CoarseTime, Id), ()>,
MutableIntMap<(usize, Id), ()>,
) {
let mut expected_deadline_queue = MutableIntMap::new();
let mut expected_size_queue = MutableIntMap::new();
messages.iter().for_each(|(id, msg)| {
use Class::*;
use Context::*;
Expand All @@ -767,7 +808,7 @@ impl MessagePool {
// Outbound guaranteed response requests have (separately recorded) deadlines.
(Outbound, GuaranteedResponse, Request) => {
let deadline = outbound_guaranteed_request_deadlines.get(id).unwrap();
expected_deadline_queue.insert((*deadline, *id));
expected_deadline_queue.insert((*deadline, *id), ());
}

// All other guaranteed response messages neither expire nor can be shed.
Expand All @@ -776,13 +817,13 @@ impl MessagePool {
// Inbound best-effort responses don't have expiration deadlines, but can be
// shed.
(Inbound, BestEffort, Response) => {
expected_size_queue.insert((msg.count_bytes(), *id));
expected_size_queue.insert((msg.count_bytes(), *id), ());
}

// All other best-effort messages are enqueued in both priority queues.
(_, BestEffort, _) => {
expected_deadline_queue.insert((msg.deadline(), *id));
expected_size_queue.insert((msg.count_bytes(), *id));
expected_deadline_queue.insert((msg.deadline(), *id), ());
expected_size_queue.insert((msg.count_bytes(), *id), ());
}
}
});
Expand Down Expand Up @@ -821,7 +862,7 @@ impl TryFrom<pb_queues::MessagePool> for MessagePool {
fn try_from(item: pb_queues::MessagePool) -> Result<Self, Self::Error> {
let message_count = item.messages.len();

let messages: BTreeMap<_, _> = item
let messages: MutableIntMap<_, _> = item
.messages
.into_iter()
.map(|entry| {
Expand Down
Loading

0 comments on commit 69981dc

Please sign in to comment.