Skip to content

Commit

Permalink
aleph-client: Fetch contract events (#877)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmikolajczyk41 authored Jan 19, 2023
1 parent 725fd98 commit b87299f
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 91 deletions.
2 changes: 1 addition & 1 deletion aleph-client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aleph-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "aleph_client"
# TODO bump major version when API stablize
version = "2.9.1"
version = "2.10.0"
edition = "2021"
license = "Apache 2.0"

Expand Down
2 changes: 2 additions & 0 deletions aleph-client/src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ pub trait ConnectionApi: Sync {
/// Data regarding submitted transaction.
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
pub struct TxInfo {
/// Hash of the block containing tx.
pub block_hash: BlockHash,
/// Hash of the transaction itself.
pub tx_hash: TxHash,
}

Expand Down
201 changes: 132 additions & 69 deletions aleph-client/src/contract/event.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,25 @@
//! Utilities for listening for contract events.
//! This module provides utilities corresponding to the events emitted by a contract.
//!
//! To use the module you will need to pass a connection, some contracts and an `UnboundedSender` to the
//! [listen_contract_events] function. You most likely want to `tokio::spawn` the resulting future, so that it runs
//! concurrently.
//!
//! ```no_run
//! # use std::sync::Arc;
//! # use std::sync::mpsc::channel;
//! # use std::time::Duration;
//! # use aleph_client::{AccountId, Connection, SignedConnection};
//! # use aleph_client::contract::ContractInstance;
//! # use aleph_client::contract::event::{listen_contract_events};
//! # use anyhow::Result;
//! use futures::{channel::mpsc::unbounded, StreamExt};
//!
//! # async fn example(conn: Connection, signed_conn: SignedConnection, address1: AccountId, address2: AccountId, path1: &str, path2: &str) -> Result<()> {
//! // The `Arc` makes it possible to pass a reference to the contract to another thread
//! let contract1 = Arc::new(ContractInstance::new(address1, path1)?);
//! let contract2 = Arc::new(ContractInstance::new(address2, path2)?);
//!
//! let conn_copy = conn.clone();
//! let contract1_copy = contract1.clone();
//! let contract2_copy = contract2.clone();
//!
//! let (tx, mut rx) = unbounded();
//! let listen = || async move {
//! listen_contract_events(&conn, &[contract1_copy.as_ref(), contract2_copy.as_ref()], tx).await?;
//! <Result<(), anyhow::Error>>::Ok(())
//! };
//! let join = tokio::spawn(listen());
//!
//! contract1.contract_exec0(&signed_conn, "some_method").await?;
//! contract2.contract_exec0(&signed_conn, "some_other_method").await?;
//!
//! println!("Received event {:?}", rx.next().await);
//!
//! rx.close();
//! join.await??;
//!
//! # Ok(())
//! # }
//! ```
//! There are two ways that you can get contract events:
//! 1. By fetching events corresponding to a particular transaction. For this, you will need to
//! provide a connection, contract instance and transaction coordinate to [get_contract_events]
//! function. Similarly to [crate::utility::BlocksApi::get_tx_events], it will fetch block
//! events, filter them and decode all relevant ones.
//! 2. By listening to all contract events. For this, you will need to provide a connection, some
//! contracts and an `UnboundedSender` to the [listen_contract_events] function. In a loop,
//! it will inspect every finalized block and look for contract events.
use std::collections::HashMap;
use std::{collections::HashMap, error::Error};

use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use contract_transcode::Value;
use futures::{channel::mpsc::UnboundedSender, StreamExt};
use subxt::events::EventDetails;

use crate::{contract::ContractInstance, AccountId, Connection};
use crate::{
api::contracts::events::ContractEmitted, connections::TxInfo, contract::ContractInstance,
utility::BlocksApi, AccountId, Connection,
};

/// Represents a single event emitted by a contract.
#[derive(Debug, Clone, Eq, PartialEq)]
Expand All @@ -61,13 +32,84 @@ pub struct ContractEvent {
pub data: HashMap<String, Value>,
}

/// Starts an event listening loop.
/// Fetch and decode all events that correspond to the call identified by `tx_info` made to
/// `contract`.
///
/// ```no_run
/// # use aleph_client::{AccountId, Connection, SignedConnection};
/// # use aleph_client::contract::ContractInstance;
/// # use aleph_client::contract::event::{get_contract_events, listen_contract_events};
/// # use anyhow::Result;
/// use futures::{channel::mpsc::unbounded, StreamExt};
///
/// # async fn example(conn: Connection, signed_conn: SignedConnection, address: AccountId, path: &str) -> Result<()> {
/// let contract = ContractInstance::new(address, path)?;
///
/// let tx_info = contract.contract_exec0(&signed_conn, "some_method").await?;
///
/// println!("Received events {:?}", get_contract_events(&conn, &contract, tx_info).await);
///
/// # Ok(())
/// # }
/// ```
pub async fn get_contract_events(
conn: &Connection,
contract: &ContractInstance,
tx_info: TxInfo,
) -> Result<Vec<ContractEvent>> {
let events = conn.get_tx_events(tx_info).await?;
translate_events(events.iter(), &[contract])
.into_iter()
.collect()
}

/// Starts an event listening loop. Will send contract event and every error encountered while
/// fetching through the provided [UnboundedSender].
///
/// Will send contract event and every error encountered while fetching through the provided [UnboundedSender].
/// Only events coming from the address of one of the `contracts` will be decoded.
///
/// The loop will terminate once `sender` is closed. The loop may also terminate in case of errors while fetching blocks
/// or decoding events (pallet events, contract event decoding errors are sent over the channel).
///
/// You most likely want to `tokio::spawn` the resulting future, so that it runs concurrently.
///
/// ```no_run
/// # use std::sync::Arc;
/// # use std::sync::mpsc::channel;
/// # use std::time::Duration;
/// # use aleph_client::{AccountId, Connection, SignedConnection};
/// # use aleph_client::contract::ContractInstance;
/// # use aleph_client::contract::event::{listen_contract_events};
/// # use anyhow::Result;
/// use futures::{channel::mpsc::unbounded, StreamExt};
///
/// # async fn example(conn: Connection, signed_conn: SignedConnection, address1: AccountId, address2: AccountId, path1: &str, path2: &str) -> Result<()> {
/// // The `Arc` makes it possible to pass a reference to the contract to another thread
/// let contract1 = Arc::new(ContractInstance::new(address1, path1)?);
/// let contract2 = Arc::new(ContractInstance::new(address2, path2)?);
///
/// let conn_copy = conn.clone();
/// let contract1_copy = contract1.clone();
/// let contract2_copy = contract2.clone();
///
/// let (tx, mut rx) = unbounded();
/// let listen = || async move {
/// listen_contract_events(&conn, &[contract1_copy.as_ref(), contract2_copy.as_ref()], tx).await?;
/// <Result<(), anyhow::Error>>::Ok(())
/// };
/// let join = tokio::spawn(listen());
///
/// contract1.contract_exec0(&signed_conn, "some_method").await?;
/// contract2.contract_exec0(&signed_conn, "some_other_method").await?;
///
/// println!("Received event {:?}", rx.next().await);
///
/// rx.close();
/// join.await??;
///
/// # Ok(())
/// # }
/// ```
pub async fn listen_contract_events(
conn: &Connection,
contracts: &[&ContractInstance],
Expand All @@ -79,33 +121,54 @@ pub async fn listen_contract_events(
if sender.is_closed() {
break;
}
let events = block?.events().await?;
for event in translate_events(events.iter(), contracts) {
sender.unbounded_send(event)?;
}
}

let block = block?;
Ok(())
}

for event in block.events().await?.iter() {
let event = event?;
/// Try to convert `events` to `ContractEvent` using matching contract from `contracts`.
fn translate_events<
Err: Error + Into<anyhow::Error> + Send + Sync + 'static,
E: Iterator<Item = Result<EventDetails, Err>>,
>(
events: E,
contracts: &[&ContractInstance],
) -> Vec<Result<ContractEvent>> {
events
.filter_map(|maybe_event| {
maybe_event
.map(|e| e.as_event::<ContractEmitted>().ok().flatten())
.transpose()
})
.map(|maybe_event| match maybe_event {
Ok(e) => translate_event(&e, contracts),
Err(e) => Err(anyhow::Error::from(e)),
})
.collect()
}

if let Some(event) =
event.as_event::<crate::api::contracts::events::ContractEmitted>()?
{
if let Some(contract) = contracts
.iter()
.find(|contract| contract.address() == &event.contract)
{
let data = zero_prefixed(&event.data);
let event = contract
.transcoder
.decode_contract_event(&mut data.as_slice());
/// Try to convert `event` to `ContractEvent` using matching contract from `contracts`.
fn translate_event(
event: &ContractEmitted,
contracts: &[&ContractInstance],
) -> Result<ContractEvent> {
let matching_contract = contracts
.iter()
.find(|contract| contract.address() == &event.contract)
.ok_or(anyhow!(
"The event wasn't emitted by any of the provided contracts"
))?;

sender.unbounded_send(
event.and_then(|event| build_event(contract.address().clone(), event)),
)?;
}
}
}
}
let data = zero_prefixed(&event.data);
let data = matching_contract
.transcoder
.decode_contract_event(&mut data.as_slice())?;

Ok(())
build_event(matching_contract.address.clone(), data)
}

/// The contract transcoder assumes there is an extra byte (that it discards) indicating the size of the data. However,
Expand Down
14 changes: 7 additions & 7 deletions aleph-client/src/contract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! ```no_run
//! # use anyhow::{Result, Context};
//! # use aleph_client::{AccountId, Balance};
//! # use aleph_client::{Connection, SignedConnection};
//! # use aleph_client::{Connection, SignedConnection, TxInfo};
//! # use aleph_client::contract::ContractInstance;
//! #
//! #[derive(Debug)]
Expand All @@ -24,7 +24,7 @@
//! })
//! }
//!
//! async fn transfer(&self, conn: &SignedConnection, to: AccountId, amount: Balance) -> Result<()> {
//! async fn transfer(&self, conn: &SignedConnection, to: AccountId, amount: Balance) -> Result<TxInfo> {
//! self.contract.contract_exec(
//! conn,
//! "PSP22::transfer",
Expand Down Expand Up @@ -52,6 +52,7 @@ use contract_transcode::ContractMessageTranscoder;
pub use convertible_value::ConvertibleValue;

use crate::{
connections::TxInfo,
contract_transcode::Value,
pallets::contract::{ContractCallArgs, ContractRpc, ContractsUserApi},
sp_weights::weight_v2::Weight,
Expand Down Expand Up @@ -128,7 +129,7 @@ impl ContractInstance {
&self,
conn: &C,
message: &str,
) -> Result<()> {
) -> Result<TxInfo> {
self.contract_exec::<C, String>(conn, message, &[]).await
}

Expand All @@ -138,7 +139,7 @@ impl ContractInstance {
conn: &C,
message: &str,
args: &[S],
) -> Result<()> {
) -> Result<TxInfo> {
self.contract_exec_value::<C, S>(conn, message, args, 0)
.await
}
Expand All @@ -149,7 +150,7 @@ impl ContractInstance {
conn: &C,
message: &str,
value: Balance,
) -> Result<()> {
) -> Result<TxInfo> {
self.contract_exec_value::<C, String>(conn, message, &[], value)
.await
}
Expand All @@ -161,7 +162,7 @@ impl ContractInstance {
message: &str,
args: &[S],
value: Balance,
) -> Result<()> {
) -> Result<TxInfo> {
let data = self.encode(message, args)?;
conn.call(
self.address.clone(),
Expand All @@ -175,7 +176,6 @@ impl ContractInstance {
TxStatus::InBlock,
)
.await
.map(|_| ())
}

fn encode<S: AsRef<str> + Debug>(&self, message: &str, args: &[S]) -> Result<Vec<u8>> {
Expand Down
2 changes: 1 addition & 1 deletion aleph-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub type SubxtClient = OnlineClient<AlephConfig>;

pub use connections::{
AsConnection, AsSigned, Connection, ConnectionApi, RootConnection, SignedConnection,
SignedConnectionApi, SudoCall,
SignedConnectionApi, SudoCall, TxInfo,
};

/// When submitting a transaction, wait for given status before proceeding.
Expand Down
2 changes: 1 addition & 1 deletion benches/payout-stakers/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/cliain/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e-tests/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b87299f

Please sign in to comment.