Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LPv2: Batch message logic #1923

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions libs/traits/src/liquidity_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ use sp_std::vec::Vec;
pub trait LPEncoding: Sized {
fn serialize(&self) -> Vec<u8>;
fn deserialize(input: &[u8]) -> Result<Self, DispatchError>;

/// Compose this message with a new one
fn pack(&self, other: Self) -> Result<Self, DispatchError>;

/// Decompose the message into a list of messages
/// If the message is not decomposable, it returns the own message.
fn unpack(&self) -> Vec<Self>;

/// Creates an empty message.
/// It's the identity message for composing messages
fn empty() -> Self;
}

#[cfg(any(test, feature = "std"))]
Expand All @@ -43,6 +54,18 @@ pub mod test_util {
None => Err("empty message".into()),
}
}

fn pack(&self, _: Self) -> Result<Self, DispatchError> {
unimplemented!()
}

fn unpack(&self) -> Vec<Self> {
vec![Self]
}

fn empty() -> Self {
unimplemented!()
}
}
}

Expand Down
213 changes: 119 additions & 94 deletions pallets/liquidity-pools-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ pub mod pallet {
(Domain, T::AccountId, T::Message, DispatchError),
>;

#[pallet::storage]
pub(crate) type PackedMessage<T: Config> =
StorageMap<_, Blake2_128Concat, (T::AccountId, Domain), T::Message>;

#[pallet::error]
pub enum Error<T> {
/// Router initialization failed.
Expand Down Expand Up @@ -301,6 +305,14 @@ pub mod pallet {

/// Failed outbound message not found in storage.
FailedOutboundMessageNotFound,

/// Emitted when you call `start_pack_message()` but that was already
/// called. You should finalize the message with `end_pack_message()`
MessagePackingAlreadyStarted,

/// Emitted when you can `end_pack_message()` but the packing process
/// was not started by `start_pack_message()`.
MessagePackingNotStarted,
}

#[pallet::hooks]
Expand Down Expand Up @@ -420,6 +432,8 @@ pub mod pallet {
/// Process an incoming message.
#[pallet::weight(T::WeightInfo::process_msg())]
#[pallet::call_index(5)]
// TODO: can we modify the name to receive_message() ?
// TODO: benchmark me again with max batch limit message
pub fn process_msg(
lemunozm marked this conversation as resolved.
Show resolved Hide resolved
origin: OriginFor<T>,
msg: BoundedVec<u8, T::MaxIncomingMessageSize>,
Expand Down Expand Up @@ -517,7 +531,11 @@ pub mod pallet {
}
};

T::InboundQueue::submit(domain_address, incoming_msg)
for msg in incoming_msg.unpack() {
T::InboundQueue::submit(domain_address.clone(), msg)?;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weight of this function must take into account the maximum batch size possible. Defined by the deserialization implemenation.

Copy link
Contributor Author

@lemunozm lemunozm Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really difficult to measure because, right now, it's "not measured". It's using hardcoded weights. The real weight of this does not depend on how many bytes we read from the message, it mostly depends of what messages are in the batch that will dispatch certain actions. i.e., if this message is an increase it will call to foreign investments and would imply swaps having a weight increase.

I'm not sure what kind of solution we want to follow here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got one which multiplies the current weight used by the number of messages in the batch


Ok(())
}

/// Convenience method for manually processing an outbound message.
Expand All @@ -535,7 +553,7 @@ pub mod pallet {
let (domain, sender, message) = OutboundMessageQueue::<T>::take(nonce)
.ok_or(Error::<T>::OutboundMessageNotFound)?;

match Self::process_message(domain.clone(), sender.clone(), message.clone()) {
match Self::send_message(domain.clone(), sender.clone(), message.clone()).0 {
Ok(_) => {
Self::deposit_event(Event::<T>::OutboundMessageExecutionSuccess {
nonce,
Expand All @@ -546,16 +564,16 @@ pub mod pallet {

Ok(())
}
Err(e) => {
Err(err) => {
Self::deposit_event(Event::<T>::OutboundMessageExecutionFailure {
nonce,
domain: domain.clone(),
sender: sender.clone(),
message: message.clone(),
error: e.error,
error: err,
});

FailedOutboundMessages::<T>::insert(nonce, (domain, sender, message, e.error));
FailedOutboundMessages::<T>::insert(nonce, (domain, sender, message, err));

Ok(())
}
Expand All @@ -574,7 +592,7 @@ pub mod pallet {
let (domain, sender, message, _) = FailedOutboundMessages::<T>::get(nonce)
.ok_or(Error::<T>::OutboundMessageNotFound)?;

match Self::process_message(domain.clone(), sender.clone(), message.clone()) {
match Self::send_message(domain.clone(), sender.clone(), message.clone()).0 {
Ok(_) => {
Self::deposit_event(Event::<T>::OutboundMessageExecutionSuccess {
nonce,
Expand All @@ -587,19 +605,47 @@ pub mod pallet {

Ok(())
}
Err(e) => {
Err(err) => {
Self::deposit_event(Event::<T>::OutboundMessageExecutionFailure {
nonce,
domain: domain.clone(),
sender: sender.clone(),
message: message.clone(),
error: e.error,
error: err,
});

Ok(())
}
}
}

#[pallet::call_index(8)]
#[pallet::weight(10_000 + T::DbWeight::get().reads_writes(1, 1).ref_time())]
pub fn start_pack_messages(origin: OriginFor<T>, destination: Domain) -> DispatchResult {
let sender = ensure_signed(origin)?;

PackedMessage::<T>::mutate((&sender, &destination), |msg| match msg {
Some(_) => return Err(Error::<T>::MessagePackingAlreadyStarted),
None => {
*msg = Some(T::Message::empty());
Ok(())
}
})?;

Ok(())
}

#[pallet::call_index(9)]
#[pallet::weight(10_000 + T::DbWeight::get().writes(1).ref_time())] //TODO: benchmark me
pub fn end_pack_messages(origin: OriginFor<T>, destination: Domain) -> DispatchResult {
let sender = ensure_signed(origin)?;

match PackedMessage::<T>::take((&sender, &destination)) {
Some(msg) if msg.unpack().is_empty() => Ok(()), //No-op
Some(msg) => Self::queue_message(destination, msg),
None => Err(Error::<T>::MessagePackingNotStarted.into()),
}
}
}

impl<T: Config> Pallet<T> {
Expand Down Expand Up @@ -655,49 +701,43 @@ pub mod pallet {
processed_entries.push(nonce);

let weight =
match Self::process_message(domain.clone(), sender.clone(), message.clone()) {
Ok(post_info) => {
match Self::send_message(domain.clone(), sender.clone(), message.clone()) {
(Ok(()), weight) => {
Self::deposit_event(Event::OutboundMessageExecutionSuccess {
nonce,
sender,
domain,
message,
});

post_info
.actual_weight
.expect("Message processing success already ensured")
// Extra weight breakdown:
//
// 1 read for the outbound message
// 1 write for the event
// 1 write for the outbound message removal
.saturating_add(T::DbWeight::get().reads_writes(1, 2))
// Extra weight breakdown:
//
// 1 read for the outbound message
// 1 write for the event
// 1 write for the outbound message removal
weight.saturating_add(T::DbWeight::get().reads_writes(1, 2))
}
Err(e) => {
(Err(err), weight) => {
Self::deposit_event(Event::OutboundMessageExecutionFailure {
nonce,
sender: sender.clone(),
domain: domain.clone(),
message: message.clone(),
error: e.error,
error: err,
});

FailedOutboundMessages::<T>::insert(
nonce,
(domain, sender, message, e.error),
(domain, sender, message, err),
);

e.post_info
.actual_weight
.expect("Message processing success already ensured")
// Extra weight breakdown:
//
// 1 read for the outbound message
// 1 write for the event
// 1 write for the failed outbound message
// 1 write for the outbound message removal
.saturating_add(T::DbWeight::get().reads_writes(1, 3))
// Extra weight breakdown:
//
// 1 read for the outbound message
// 1 write for the event
// 1 write for the failed outbound message
// 1 write for the outbound message removal
weight.saturating_add(T::DbWeight::get().reads_writes(1, 3))
}
};

Expand All @@ -718,60 +758,54 @@ pub mod pallet {
/// Retrieves the router stored for the provided domain and sends the
/// message, calculating and returning the required weight for these
/// operations in the `DispatchResultWithPostInfo`.
fn process_message(
fn send_message(
domain: Domain,
sender: T::AccountId,
message: T::Message,
) -> DispatchResultWithPostInfo {
) -> (DispatchResult, Weight) {
let read_weight = T::DbWeight::get().reads(1);

let router = DomainRouters::<T>::get(domain).ok_or(DispatchErrorWithPostInfo {
post_info: PostDispatchInfo {
actual_weight: Some(read_weight),
pays_fee: Pays::Yes,
},
error: Error::<T>::RouterNotFound.into(),
})?;
let Some(router) = DomainRouters::<T>::get(domain) else {
return (Err(Error::<T>::RouterNotFound.into()), read_weight);
};

let post_dispatch_info_fn =
|actual_weight: Option<Weight>, extra_weight: Weight| -> PostDispatchInfo {
PostDispatchInfo {
actual_weight: Some(Self::get_outbound_message_processing_weight(
actual_weight,
extra_weight,
)),
pays_fee: Pays::Yes,
}
};

match router.send(sender, message.serialize()) {
Ok(dispatch_info) => Ok(post_dispatch_info_fn(
dispatch_info.actual_weight,
read_weight,
)),
Err(e) => Err(DispatchErrorWithPostInfo {
post_info: post_dispatch_info_fn(e.post_info.actual_weight, read_weight),
error: e.error,
}),
}
let serialized = message.serialize();

let message_weight =
read_weight.saturating_add(Weight::from_parts(0, serialized.len() as u64));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Message PoV size evaluated here


let (result, router_weight) = match router.send(sender, serialized) {
Ok(dispatch_info) => (Ok(()), dispatch_info.actual_weight),
Err(e) => (Err(e.error), e.post_info.actual_weight),
};

(
result,
router_weight
.unwrap_or(Weight::from_parts(DEFAULT_WEIGHT_REF_TIME, 0))
.saturating_add(message_weight)
.saturating_add(read_weight),
)
}

/// Calculates the weight used by a router when processing an outbound
/// message.
fn get_outbound_message_processing_weight(
router_call_weight: Option<Weight>,
extra_weight: Weight,
) -> Weight {
let pov_weight: u64 = (Domain::max_encoded_len()
+ T::AccountId::max_encoded_len()
+ T::Message::max_encoded_len())
.try_into()
.expect("can calculate outbound message POV weight");

router_call_weight
.unwrap_or(Weight::from_parts(DEFAULT_WEIGHT_REF_TIME, 0))
.saturating_add(Weight::from_parts(0, pov_weight))
.saturating_add(extra_weight)
fn queue_message(destination: Domain, message: T::Message) -> DispatchResult {
let nonce = <OutboundMessageNonceStore<T>>::try_mutate(|n| {
n.ensure_add_assign(T::OutboundMessageNonce::one())?;
Ok::<T::OutboundMessageNonce, DispatchError>(*n)
})?;

OutboundMessageQueue::<T>::insert(
nonce,
(destination.clone(), T::Sender::get(), message.clone()),
);

Self::deposit_event(Event::OutboundMessageSubmitted {
sender: T::Sender::get(),
domain: destination,
message,
});

Ok(())
}
}

Expand All @@ -787,7 +821,7 @@ pub mod pallet {
type Sender = T::AccountId;

fn submit(
_sender: Self::Sender,
sender: Self::Sender,
destination: Self::Destination,
message: Self::Message,
) -> DispatchResult {
Expand All @@ -801,21 +835,12 @@ pub mod pallet {
Error::<T>::RouterNotFound
);

let nonce = <OutboundMessageNonceStore<T>>::try_mutate(|n| {
n.ensure_add_assign(T::OutboundMessageNonce::one())?;
Ok::<T::OutboundMessageNonce, DispatchError>(*n)
})?;

OutboundMessageQueue::<T>::insert(
nonce,
(destination.clone(), T::Sender::get(), message.clone()),
);

Self::deposit_event(Event::OutboundMessageSubmitted {
sender: T::Sender::get(),
domain: destination,
message,
});
match PackedMessage::<T>::get((&sender, &destination)) {
Some(packed) => {
PackedMessage::<T>::insert((sender, destination), packed.pack(message)?)
lemunozm marked this conversation as resolved.
Show resolved Hide resolved
}
None => Self::queue_message(destination, message)?,
}

Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions pallets/liquidity-pools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ pub mod pallet {
InvestorDomainAddressNotAMember,
/// Only the PoolAdmin can execute a given operation.
NotPoolAdmin,
/// This pallet does not expect to receive direclty a batch message,
/// instead it expects several calls to it with different messages.
UnsupportedBatchMessage,
}

#[pallet::call]
Expand Down Expand Up @@ -1038,6 +1041,7 @@ pub mod pallet {
currency.into(),
sender,
),
Message::Batch(_) => Err(Error::<T>::UnsupportedBatchMessage.into()),
_ => Err(Error::<T>::InvalidIncomingMessage.into()),
}?;

Expand Down
Loading