Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
MessageQueue: unknit permanently overweight books (#13528)
Browse files Browse the repository at this point in the history
* Unknit permanently overweight books

A book with only permanently overweight messages should be unkit
from the ready ring. This does currently not happen since perm.
overweight messages are not counted as "processed" and therefore
not increase the "total_processed" counter.

This is only a problem when the next and only message that is
processed is overweight. Eventually this should resolve itself
when another non-overweight message is enqueued and processed.
But for correctness it should be unknitted.

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add tests

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* fmt

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* One more tests

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* ".git/.scripts/commands/bench/bench.sh" pallet dev pallet-message-queue

---------

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Co-authored-by: command-bot <>
  • Loading branch information
ggwpez authored Mar 7, 2023
1 parent 8a4746e commit 306382b
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 66 deletions.
4 changes: 2 additions & 2 deletions frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,11 +961,11 @@ impl<T: Config> Pallet<T> {
book_state.begin.saturating_inc();
}
let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
if book_state.begin >= book_state.end && total_processed > 0 {
if book_state.begin >= book_state.end {
// No longer ready - unknit.
if let Some(neighbours) = book_state.ready_neighbours.take() {
Self::ready_ring_unknit(&origin, neighbours);
} else {
} else if total_processed > 0 {
defensive!("Freshly processed queue must have been ready");
}
}
Expand Down
9 changes: 9 additions & 0 deletions frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,12 @@ pub fn knit(queue: &MessageOrigin) {
pub fn unknit(queue: &MessageOrigin) {
super::mock_helpers::unknit::<Test>(queue);
}

pub fn num_overweight_enqueued_events() -> u32 {
frame_system::Pallet::<Test>::events()
.into_iter()
.filter(|e| {
matches!(e.event, RuntimeEvent::MessageQueue(crate::Event::OverweightEnqueued { .. }))
})
.count() as u32
}
4 changes: 2 additions & 2 deletions frame/message-queue/src/mock_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ where
}

/// Create a message from the given data.
pub fn msg<N: Get<u32>>(x: &'static str) -> BoundedSlice<u8, N> {
pub fn msg<N: Get<u32>>(x: &str) -> BoundedSlice<u8, N> {
BoundedSlice::defensive_truncate_from(x.as_bytes())
}

pub fn vmsg(x: &'static str) -> Vec<u8> {
pub fn vmsg(x: &str) -> Vec<u8> {
x.as_bytes().to_vec()
}

Expand Down
115 changes: 115 additions & 0 deletions frame/message-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,121 @@ fn execute_overweight_works() {
assert_eq!(consumed, Err(ExecuteOverweightError::NotFound));
assert!(QueueChanges::take().is_empty());
assert!(!Pages::<Test>::contains_key(origin, 0), "Page is gone");
// The book should have been unknit from the ready ring.
assert!(!ServiceHead::<Test>::exists(), "No ready book");
});
}

#[test]
fn permanently_overweight_book_unknits() {
use MessageOrigin::*;

new_test_ext::<Test>().execute_with(|| {
set_weight("bump_service_head", 1.into_weight());
set_weight("service_queue_base", 1.into_weight());
set_weight("service_page_base_completion", 1.into_weight());

MessageQueue::enqueue_messages([msg("weight=9")].into_iter(), Here);

// It is the only ready book.
assert_ring(&[Here]);
// Mark the message as overweight.
assert_eq!(MessageQueue::service_queues(8.into_weight()), 4.into_weight());
assert_last_event::<Test>(
Event::OverweightEnqueued {
hash: <Test as frame_system::Config>::Hashing::hash(b"weight=9"),
origin: Here,
message_index: 0,
page_index: 0,
}
.into(),
);
// The book is not ready anymore.
assert_ring(&[]);
assert_eq!(MessagesProcessed::take().len(), 0);
assert_eq!(BookStateFor::<Test>::get(Here).message_count, 1);
// Now if we enqueue another message, it will become ready again.
MessageQueue::enqueue_messages([msg("weight=1")].into_iter(), Here);
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(8.into_weight()), 5.into_weight());
assert_eq!(MessagesProcessed::take().len(), 1);
assert_ring(&[]);
});
}

#[test]
fn permanently_overweight_book_unknits_multiple() {
use MessageOrigin::*;

new_test_ext::<Test>().execute_with(|| {
set_weight("bump_service_head", 1.into_weight());
set_weight("service_queue_base", 1.into_weight());
set_weight("service_page_base_completion", 1.into_weight());

MessageQueue::enqueue_messages(
[msg("weight=1"), msg("weight=9"), msg("weight=9")].into_iter(),
Here,
);

assert_ring(&[Here]);
// Process the first message.
assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
assert_eq!(num_overweight_enqueued_events(), 0);
assert_eq!(MessagesProcessed::take().len(), 1);

// Book is still ready since it was not marked as overweight yet.
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(8.into_weight()), 5.into_weight());
assert_eq!(num_overweight_enqueued_events(), 2);
assert_eq!(MessagesProcessed::take().len(), 0);
// Now it is overweight.
assert_ring(&[]);
// Enqueue another message.
MessageQueue::enqueue_messages([msg("weight=1")].into_iter(), Here);
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
assert_eq!(MessagesProcessed::take().len(), 1);
assert_ring(&[]);
});
}

/// We don't want empty books in the ready ring, but if they somehow make their way in there, it
/// should not panic.
#[test]
#[cfg(not(debug_assertions))] // Would trigger a defensive failure otherwise.
fn ready_but_empty_does_not_panic() {
use MessageOrigin::*;

new_test_ext::<Test>().execute_with(|| {
BookStateFor::<Test>::insert(Here, empty_book::<Test>());
BookStateFor::<Test>::insert(There, empty_book::<Test>());

knit(&Here);
knit(&There);
assert_ring(&[Here, There]);

assert_eq!(MessageQueue::service_queues(Weight::MAX), 0.into_weight());
assert_ring(&[]);
});
}

/// We don't want permanently books in the ready ring, but if they somehow make their way in there,
/// it should not panic.
#[test]
#[cfg(not(debug_assertions))] // Would trigger a defensive failure otherwise.
fn ready_but_perm_overweight_does_not_panic() {
use MessageOrigin::*;

new_test_ext::<Test>().execute_with(|| {
MessageQueue::enqueue_message(msg("weight=9"), Here);
assert_eq!(MessageQueue::service_queues(8.into_weight()), 0.into_weight());
assert_ring(&[]);
// Force it back into the ready ring.
knit(&Here);
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(Weight::MAX), 0.into_weight());
// Unready again.
assert_ring(&[]);
});
}

Expand Down
Loading

0 comments on commit 306382b

Please sign in to comment.