Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LPv2: Batch message logic #1923

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions libs/traits/src/liquidity_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ use sp_std::vec::Vec;
pub trait LPEncoding: Sized {
fn serialize(&self) -> Vec<u8>;
fn deserialize(input: &[u8]) -> Result<Self, DispatchError>;

/// Compose this message with a new one
fn pack(&self, other: Self) -> Result<Self, DispatchError>;

/// Decompose the message into a list of messages
/// If the message is not decomposable, it returns the own message.
fn unpack(&self) -> Vec<Self>;

/// Creates an empty message.
/// It's the identity message for composing messages
fn empty() -> Self;
}

#[cfg(any(test, feature = "std"))]
Expand All @@ -43,6 +54,18 @@ pub mod test_util {
None => Err("empty message".into()),
}
}

fn pack(&self, _: Self) -> Result<Self, DispatchError> {
unimplemented!()
}

fn unpack(&self) -> Vec<Self> {
vec![Self]
}

fn empty() -> Self {
unimplemented!()
}
}
}

Expand Down
77 changes: 60 additions & 17 deletions pallets/liquidity-pools-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ pub mod pallet {
(Domain, T::AccountId, T::Message, DispatchError),
>;

#[pallet::storage]
pub(crate) type PackedMessage<T: Config> =
StorageMap<_, Blake2_128Concat, (T::AccountId, Domain), T::Message>;

#[pallet::error]
pub enum Error<T> {
/// Router initialization failed.
Expand Down Expand Up @@ -517,7 +521,11 @@ pub mod pallet {
}
};

T::InboundQueue::submit(domain_address, incoming_msg)
for msg in incoming_msg.unpack() {
T::InboundQueue::submit(domain_address.clone(), msg)?;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weight of this function must take into account the maximum batch size possible. Defined by the deserialization implemenation.

Copy link
Contributor Author

@lemunozm lemunozm Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really difficult to measure because, right now, it's "not measured". It's using hardcoded weights. The real weight of this does not depend on how many bytes we read from the message, it mostly depends of what messages are in the batch that will dispatch certain actions. i.e., if this message is an increase it will call to foreign investments and would imply swaps having a weight increase.

I'm not sure what kind of solution we want to follow here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got one which multiplies the current weight used by the number of messages in the batch


Ok(())
}

/// Convenience method for manually processing an outbound message.
Expand Down Expand Up @@ -600,6 +608,30 @@ pub mod pallet {
}
}
}

#[pallet::call_index(8)]
#[pallet::weight(10_000 + T::DbWeight::get().writes(1).ref_time())]
pub fn start_pack_messages(origin: OriginFor<T>, destination: Domain) -> DispatchResult {
let sender = ensure_signed(origin)?;

if PackedMessage::<T>::get((&sender, &destination)).is_none() {
PackedMessage::<T>::insert((sender, destination), T::Message::empty());
}
lemunozm marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}

#[pallet::call_index(9)]
#[pallet::weight(10_000 + T::DbWeight::get().writes(1).ref_time())] //TODO: benchmark me
pub fn end_pack_messages(origin: OriginFor<T>, destination: Domain) -> DispatchResult {
let sender = ensure_signed(origin)?;

if let Some(msg) = PackedMessage::<T>::take((&sender, &destination)) {
Self::queue_message(destination, msg)?;
}
lemunozm marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
}

impl<T: Config> Pallet<T> {
Expand Down Expand Up @@ -773,6 +805,26 @@ pub mod pallet {
.saturating_add(Weight::from_parts(0, pov_weight))
.saturating_add(extra_weight)
}

fn queue_message(destination: Domain, message: T::Message) -> DispatchResult {
let nonce = <OutboundMessageNonceStore<T>>::try_mutate(|n| {
n.ensure_add_assign(T::OutboundMessageNonce::one())?;
Ok::<T::OutboundMessageNonce, DispatchError>(*n)
})?;

OutboundMessageQueue::<T>::insert(
nonce,
(destination.clone(), T::Sender::get(), message.clone()),
);

Self::deposit_event(Event::OutboundMessageSubmitted {
sender: T::Sender::get(),
domain: destination,
message,
});

Ok(())
}
}

/// This pallet will be the `OutboundQueue` used by other pallets to send
Expand All @@ -787,7 +839,7 @@ pub mod pallet {
type Sender = T::AccountId;

fn submit(
_sender: Self::Sender,
sender: Self::Sender,
destination: Self::Destination,
message: Self::Message,
) -> DispatchResult {
Expand All @@ -801,21 +853,12 @@ pub mod pallet {
Error::<T>::RouterNotFound
);

let nonce = <OutboundMessageNonceStore<T>>::try_mutate(|n| {
n.ensure_add_assign(T::OutboundMessageNonce::one())?;
Ok::<T::OutboundMessageNonce, DispatchError>(*n)
})?;

OutboundMessageQueue::<T>::insert(
nonce,
(destination.clone(), T::Sender::get(), message.clone()),
);

Self::deposit_event(Event::OutboundMessageSubmitted {
sender: T::Sender::get(),
domain: destination,
message,
});
match PackedMessage::<T>::get((&sender, &destination)) {
Some(packed) => {
PackedMessage::<T>::insert((sender, destination), packed.pack(message)?)
lemunozm marked this conversation as resolved.
Show resolved Hide resolved
}
None => Self::queue_message(destination, message)?,
}

Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions pallets/liquidity-pools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ pub mod pallet {
InvestorDomainAddressNotAMember,
/// Only the PoolAdmin can execute a given operation.
NotPoolAdmin,
/// This pallet does not expect to receive direclty a batch message,
/// instead it expects several calls to it with different messages.
UnsupportedBatchMessage,
}

#[pallet::call]
Expand Down Expand Up @@ -1038,6 +1041,7 @@ pub mod pallet {
currency.into(),
sender,
),
Message::Batch(_) => Err(Error::<T>::UnsupportedBatchMessage.into()),
_ => Err(Error::<T>::InvalidIncomingMessage.into()),
}?;

Expand Down
28 changes: 15 additions & 13 deletions pallets/liquidity-pools/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,18 @@ pub enum Message<BatchContent = BatchMessages> {
},
}

impl Message {
/// Compose this message with a new one
pub fn pack(&self, other: Self) -> Result<Self, DispatchError> {
impl Message {}

impl LPEncoding for Message {
fn serialize(&self) -> Vec<u8> {
gmpf::to_vec(self).unwrap_or_default()
}

fn deserialize(data: &[u8]) -> Result<Self, DispatchError> {
gmpf::from_slice(data).map_err(|_| DispatchError::Other("LP Deserialization issue"))
}

fn pack(&self, other: Self) -> Result<Self, DispatchError> {
Ok(match self.clone() {
Message::Batch(content) => {
let mut content = content.clone();
Expand All @@ -542,22 +551,15 @@ impl Message {
})
}

/// Decompose the message into a list of messages
pub fn unpack(&self) -> Vec<Self> {
fn unpack(&self) -> Vec<Self> {
match self {
Message::Batch(content) => content.clone().into_iter().collect(),
message => vec![message.clone()],
}
}
}

impl LPEncoding for Message {
fn serialize(&self) -> Vec<u8> {
gmpf::to_vec(self).unwrap_or_default()
}

fn deserialize(data: &[u8]) -> Result<Self, DispatchError> {
gmpf::from_slice(data).map_err(|_| DispatchError::Other("LP Deserialization issue"))
fn empty() -> Message {
Message::Batch(BatchMessages::default())
}
}

Expand Down