From b87299f75357530d377cdec64a6de72819a52c14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Thu, 19 Jan 2023 15:34:04 +0100 Subject: [PATCH] `aleph-client`: Fetch contract events (#877) --- aleph-client/Cargo.lock | 2 +- aleph-client/Cargo.toml | 2 +- aleph-client/src/connections.rs | 2 + aleph-client/src/contract/event.rs | 201 +++++++++++++++++++---------- aleph-client/src/contract/mod.rs | 14 +- aleph-client/src/lib.rs | 2 +- benches/payout-stakers/Cargo.lock | 2 +- bin/cliain/Cargo.lock | 2 +- e2e-tests/Cargo.lock | 2 +- e2e-tests/src/test/adder.rs | 62 +++++++-- flooder/Cargo.lock | 2 +- 11 files changed, 202 insertions(+), 91 deletions(-) diff --git a/aleph-client/Cargo.lock b/aleph-client/Cargo.lock index db361dd68b..52ef28bbd8 100644 --- a/aleph-client/Cargo.lock +++ b/aleph-client/Cargo.lock @@ -49,7 +49,7 @@ dependencies = [ [[package]] name = "aleph_client" -version = "2.9.1" +version = "2.10.0" dependencies = [ "anyhow", "async-trait", diff --git a/aleph-client/Cargo.toml b/aleph-client/Cargo.toml index 9e41fc6f28..841c78085e 100644 --- a/aleph-client/Cargo.toml +++ b/aleph-client/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "aleph_client" # TODO bump major version when API stablize -version = "2.9.1" +version = "2.10.0" edition = "2021" license = "Apache 2.0" diff --git a/aleph-client/src/connections.rs b/aleph-client/src/connections.rs index 78f7d49db8..fee84fa234 100644 --- a/aleph-client/src/connections.rs +++ b/aleph-client/src/connections.rs @@ -113,7 +113,9 @@ pub trait ConnectionApi: Sync { /// Data regarding submitted transaction. #[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize, Serialize)] pub struct TxInfo { + /// Hash of the block containing tx. pub block_hash: BlockHash, + /// Hash of the transaction itself. pub tx_hash: TxHash, } diff --git a/aleph-client/src/contract/event.rs b/aleph-client/src/contract/event.rs index 7f8a855d86..83f02a6e5b 100644 --- a/aleph-client/src/contract/event.rs +++ b/aleph-client/src/contract/event.rs @@ -1,54 +1,25 @@ -//! Utilities for listening for contract events. +//! This module provides utilities corresponding to the events emitted by a contract. //! -//! To use the module you will need to pass a connection, some contracts and an `UnboundedSender` to the -//! [listen_contract_events] function. You most likely want to `tokio::spawn` the resulting future, so that it runs -//! concurrently. -//! -//! ```no_run -//! # use std::sync::Arc; -//! # use std::sync::mpsc::channel; -//! # use std::time::Duration; -//! # use aleph_client::{AccountId, Connection, SignedConnection}; -//! # use aleph_client::contract::ContractInstance; -//! # use aleph_client::contract::event::{listen_contract_events}; -//! # use anyhow::Result; -//! use futures::{channel::mpsc::unbounded, StreamExt}; -//! -//! # async fn example(conn: Connection, signed_conn: SignedConnection, address1: AccountId, address2: AccountId, path1: &str, path2: &str) -> Result<()> { -//! // The `Arc` makes it possible to pass a reference to the contract to another thread -//! let contract1 = Arc::new(ContractInstance::new(address1, path1)?); -//! let contract2 = Arc::new(ContractInstance::new(address2, path2)?); -//! -//! let conn_copy = conn.clone(); -//! let contract1_copy = contract1.clone(); -//! let contract2_copy = contract2.clone(); -//! -//! let (tx, mut rx) = unbounded(); -//! let listen = || async move { -//! listen_contract_events(&conn, &[contract1_copy.as_ref(), contract2_copy.as_ref()], tx).await?; -//! >::Ok(()) -//! }; -//! let join = tokio::spawn(listen()); -//! -//! contract1.contract_exec0(&signed_conn, "some_method").await?; -//! contract2.contract_exec0(&signed_conn, "some_other_method").await?; -//! -//! println!("Received event {:?}", rx.next().await); -//! -//! rx.close(); -//! join.await??; -//! -//! # Ok(()) -//! # } -//! ``` +//! There are two ways that you can get contract events: +//! 1. By fetching events corresponding to a particular transaction. For this, you will need to +//! provide a connection, contract instance and transaction coordinate to [get_contract_events] +//! function. Similarly to [crate::utility::BlocksApi::get_tx_events], it will fetch block +//! events, filter them and decode all relevant ones. +//! 2. By listening to all contract events. For this, you will need to provide a connection, some +//! contracts and an `UnboundedSender` to the [listen_contract_events] function. In a loop, +//! it will inspect every finalized block and look for contract events. -use std::collections::HashMap; +use std::{collections::HashMap, error::Error}; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use contract_transcode::Value; use futures::{channel::mpsc::UnboundedSender, StreamExt}; +use subxt::events::EventDetails; -use crate::{contract::ContractInstance, AccountId, Connection}; +use crate::{ + api::contracts::events::ContractEmitted, connections::TxInfo, contract::ContractInstance, + utility::BlocksApi, AccountId, Connection, +}; /// Represents a single event emitted by a contract. #[derive(Debug, Clone, Eq, PartialEq)] @@ -61,13 +32,84 @@ pub struct ContractEvent { pub data: HashMap, } -/// Starts an event listening loop. +/// Fetch and decode all events that correspond to the call identified by `tx_info` made to +/// `contract`. +/// +/// ```no_run +/// # use aleph_client::{AccountId, Connection, SignedConnection}; +/// # use aleph_client::contract::ContractInstance; +/// # use aleph_client::contract::event::{get_contract_events, listen_contract_events}; +/// # use anyhow::Result; +/// use futures::{channel::mpsc::unbounded, StreamExt}; +/// +/// # async fn example(conn: Connection, signed_conn: SignedConnection, address: AccountId, path: &str) -> Result<()> { +/// let contract = ContractInstance::new(address, path)?; +/// +/// let tx_info = contract.contract_exec0(&signed_conn, "some_method").await?; +/// +/// println!("Received events {:?}", get_contract_events(&conn, &contract, tx_info).await); +/// +/// # Ok(()) +/// # } +/// ``` +pub async fn get_contract_events( + conn: &Connection, + contract: &ContractInstance, + tx_info: TxInfo, +) -> Result> { + let events = conn.get_tx_events(tx_info).await?; + translate_events(events.iter(), &[contract]) + .into_iter() + .collect() +} + +/// Starts an event listening loop. Will send contract event and every error encountered while +/// fetching through the provided [UnboundedSender]. /// -/// Will send contract event and every error encountered while fetching through the provided [UnboundedSender]. /// Only events coming from the address of one of the `contracts` will be decoded. /// /// The loop will terminate once `sender` is closed. The loop may also terminate in case of errors while fetching blocks /// or decoding events (pallet events, contract event decoding errors are sent over the channel). +/// +/// You most likely want to `tokio::spawn` the resulting future, so that it runs concurrently. +/// +/// ```no_run +/// # use std::sync::Arc; +/// # use std::sync::mpsc::channel; +/// # use std::time::Duration; +/// # use aleph_client::{AccountId, Connection, SignedConnection}; +/// # use aleph_client::contract::ContractInstance; +/// # use aleph_client::contract::event::{listen_contract_events}; +/// # use anyhow::Result; +/// use futures::{channel::mpsc::unbounded, StreamExt}; +/// +/// # async fn example(conn: Connection, signed_conn: SignedConnection, address1: AccountId, address2: AccountId, path1: &str, path2: &str) -> Result<()> { +/// // The `Arc` makes it possible to pass a reference to the contract to another thread +/// let contract1 = Arc::new(ContractInstance::new(address1, path1)?); +/// let contract2 = Arc::new(ContractInstance::new(address2, path2)?); +/// +/// let conn_copy = conn.clone(); +/// let contract1_copy = contract1.clone(); +/// let contract2_copy = contract2.clone(); +/// +/// let (tx, mut rx) = unbounded(); +/// let listen = || async move { +/// listen_contract_events(&conn, &[contract1_copy.as_ref(), contract2_copy.as_ref()], tx).await?; +/// >::Ok(()) +/// }; +/// let join = tokio::spawn(listen()); +/// +/// contract1.contract_exec0(&signed_conn, "some_method").await?; +/// contract2.contract_exec0(&signed_conn, "some_other_method").await?; +/// +/// println!("Received event {:?}", rx.next().await); +/// +/// rx.close(); +/// join.await??; +/// +/// # Ok(()) +/// # } +/// ``` pub async fn listen_contract_events( conn: &Connection, contracts: &[&ContractInstance], @@ -79,33 +121,54 @@ pub async fn listen_contract_events( if sender.is_closed() { break; } + let events = block?.events().await?; + for event in translate_events(events.iter(), contracts) { + sender.unbounded_send(event)?; + } + } - let block = block?; + Ok(()) +} - for event in block.events().await?.iter() { - let event = event?; +/// Try to convert `events` to `ContractEvent` using matching contract from `contracts`. +fn translate_events< + Err: Error + Into + Send + Sync + 'static, + E: Iterator>, +>( + events: E, + contracts: &[&ContractInstance], +) -> Vec> { + events + .filter_map(|maybe_event| { + maybe_event + .map(|e| e.as_event::().ok().flatten()) + .transpose() + }) + .map(|maybe_event| match maybe_event { + Ok(e) => translate_event(&e, contracts), + Err(e) => Err(anyhow::Error::from(e)), + }) + .collect() +} - if let Some(event) = - event.as_event::()? - { - if let Some(contract) = contracts - .iter() - .find(|contract| contract.address() == &event.contract) - { - let data = zero_prefixed(&event.data); - let event = contract - .transcoder - .decode_contract_event(&mut data.as_slice()); +/// Try to convert `event` to `ContractEvent` using matching contract from `contracts`. +fn translate_event( + event: &ContractEmitted, + contracts: &[&ContractInstance], +) -> Result { + let matching_contract = contracts + .iter() + .find(|contract| contract.address() == &event.contract) + .ok_or(anyhow!( + "The event wasn't emitted by any of the provided contracts" + ))?; - sender.unbounded_send( - event.and_then(|event| build_event(contract.address().clone(), event)), - )?; - } - } - } - } + let data = zero_prefixed(&event.data); + let data = matching_contract + .transcoder + .decode_contract_event(&mut data.as_slice())?; - Ok(()) + build_event(matching_contract.address.clone(), data) } /// The contract transcoder assumes there is an extra byte (that it discards) indicating the size of the data. However, diff --git a/aleph-client/src/contract/mod.rs b/aleph-client/src/contract/mod.rs index 575b57bc34..077fa4eaf9 100644 --- a/aleph-client/src/contract/mod.rs +++ b/aleph-client/src/contract/mod.rs @@ -6,7 +6,7 @@ //! ```no_run //! # use anyhow::{Result, Context}; //! # use aleph_client::{AccountId, Balance}; -//! # use aleph_client::{Connection, SignedConnection}; +//! # use aleph_client::{Connection, SignedConnection, TxInfo}; //! # use aleph_client::contract::ContractInstance; //! # //! #[derive(Debug)] @@ -24,7 +24,7 @@ //! }) //! } //! -//! async fn transfer(&self, conn: &SignedConnection, to: AccountId, amount: Balance) -> Result<()> { +//! async fn transfer(&self, conn: &SignedConnection, to: AccountId, amount: Balance) -> Result { //! self.contract.contract_exec( //! conn, //! "PSP22::transfer", @@ -52,6 +52,7 @@ use contract_transcode::ContractMessageTranscoder; pub use convertible_value::ConvertibleValue; use crate::{ + connections::TxInfo, contract_transcode::Value, pallets::contract::{ContractCallArgs, ContractRpc, ContractsUserApi}, sp_weights::weight_v2::Weight, @@ -128,7 +129,7 @@ impl ContractInstance { &self, conn: &C, message: &str, - ) -> Result<()> { + ) -> Result { self.contract_exec::(conn, message, &[]).await } @@ -138,7 +139,7 @@ impl ContractInstance { conn: &C, message: &str, args: &[S], - ) -> Result<()> { + ) -> Result { self.contract_exec_value::(conn, message, args, 0) .await } @@ -149,7 +150,7 @@ impl ContractInstance { conn: &C, message: &str, value: Balance, - ) -> Result<()> { + ) -> Result { self.contract_exec_value::(conn, message, &[], value) .await } @@ -161,7 +162,7 @@ impl ContractInstance { message: &str, args: &[S], value: Balance, - ) -> Result<()> { + ) -> Result { let data = self.encode(message, args)?; conn.call( self.address.clone(), @@ -175,7 +176,6 @@ impl ContractInstance { TxStatus::InBlock, ) .await - .map(|_| ()) } fn encode + Debug>(&self, message: &str, args: &[S]) -> Result> { diff --git a/aleph-client/src/lib.rs b/aleph-client/src/lib.rs index 9986b58dce..af364edb7e 100644 --- a/aleph-client/src/lib.rs +++ b/aleph-client/src/lib.rs @@ -61,7 +61,7 @@ pub type SubxtClient = OnlineClient; pub use connections::{ AsConnection, AsSigned, Connection, ConnectionApi, RootConnection, SignedConnection, - SignedConnectionApi, SudoCall, + SignedConnectionApi, SudoCall, TxInfo, }; /// When submitting a transaction, wait for given status before proceeding. diff --git a/benches/payout-stakers/Cargo.lock b/benches/payout-stakers/Cargo.lock index 8687fedfed..d548ecc86d 100644 --- a/benches/payout-stakers/Cargo.lock +++ b/benches/payout-stakers/Cargo.lock @@ -49,7 +49,7 @@ dependencies = [ [[package]] name = "aleph_client" -version = "2.9.1" +version = "2.10.0" dependencies = [ "anyhow", "async-trait", diff --git a/bin/cliain/Cargo.lock b/bin/cliain/Cargo.lock index 07139815d1..42df8358a5 100644 --- a/bin/cliain/Cargo.lock +++ b/bin/cliain/Cargo.lock @@ -49,7 +49,7 @@ dependencies = [ [[package]] name = "aleph_client" -version = "2.9.1" +version = "2.10.0" dependencies = [ "anyhow", "async-trait", diff --git a/e2e-tests/Cargo.lock b/e2e-tests/Cargo.lock index 26bc1b971e..b60ada34de 100644 --- a/e2e-tests/Cargo.lock +++ b/e2e-tests/Cargo.lock @@ -78,7 +78,7 @@ dependencies = [ [[package]] name = "aleph_client" -version = "2.9.1" +version = "2.10.0" dependencies = [ "anyhow", "async-trait", diff --git a/e2e-tests/src/test/adder.rs b/e2e-tests/src/test/adder.rs index c1d71be842..5592a8c669 100644 --- a/e2e-tests/src/test/adder.rs +++ b/e2e-tests/src/test/adder.rs @@ -1,20 +1,24 @@ use std::{fmt::Debug, str::FromStr, sync::Arc}; use aleph_client::{ - contract::{event::listen_contract_events, ContractInstance}, + contract::{ + event::{get_contract_events, listen_contract_events}, + ContractInstance, + }, contract_transcode::Value, - AccountId, ConnectionApi, SignedConnectionApi, + AccountId, ConnectionApi, SignedConnectionApi, TxInfo, }; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use assert2::assert; use futures::{channel::mpsc::unbounded, StreamExt}; use crate::{config::setup_test, test::helpers::basic_test_context}; -/// This test exercises the aleph-client code for interacting with contracts by testing a simple contract that maintains -/// some state and publishes some events. +/// This test exercises the aleph-client code for interacting with contracts by testing a simple +/// contract that maintains some state and publishes some events. The events are obtained by +/// listening mechanism. #[tokio::test] -pub async fn adder() -> Result<()> { +pub async fn adder_events_listening() -> Result<()> { let config = setup_test(); let (conn, _authority, account) = basic_test_context(config).await?; @@ -60,6 +64,48 @@ pub async fn adder() -> Result<()> { Ok(()) } +/// This test exercises the aleph-client code for interacting with contracts by testing a simple +/// contract that maintains some state and publishes some events. The events are obtained by +/// fetching mechanism. +#[tokio::test] +pub async fn adder_fetching_events() -> Result<()> { + let config = setup_test(); + + let (conn, _authority, account) = basic_test_context(config).await?; + + let contract = AdderInstance::new( + &config.test_case_params.adder, + &config.test_case_params.adder_metadata, + )?; + + let increment = 10; + let before = contract.get(&conn).await?; + + let tx_info = contract.add(&account.sign(&conn), increment).await?; + let events = get_contract_events(&conn, &contract.contract, tx_info).await?; + let event = match &*events { + [event] => event, + _ => return Err(anyhow!("Expected single event, but got {events:?}")), + }; + + assert!(event.name == Some("ValueChanged".to_string())); + assert!(event.contract == *contract.contract.address()); + assert!(event.data["new_value"] == Value::UInt(before as u128 + 10)); + + let after = contract.get(&conn).await?; + assert!(after == before + increment); + + let new_name = "test"; + contract.set_name(&account.sign(&conn), None).await?; + assert!(contract.get_name(&conn).await?.is_none()); + contract + .set_name(&account.sign(&conn), Some(new_name)) + .await?; + assert!(contract.get_name(&conn).await? == Some(new_name.to_string())); + + Ok(()) +} + #[derive(Debug)] struct AdderInstance { contract: ContractInstance, @@ -95,7 +141,7 @@ impl AdderInstance { self.contract.contract_read0(conn, "get").await } - pub async fn add(&self, conn: &S, value: u32) -> Result<()> { + pub async fn add(&self, conn: &S, value: u32) -> Result { self.contract .contract_exec(conn, "add", &[value.to_string()]) .await @@ -105,7 +151,7 @@ impl AdderInstance { &self, conn: &S, name: Option<&str>, - ) -> Result<()> { + ) -> Result { let name = name.map_or_else( || "None".to_string(), |name| { diff --git a/flooder/Cargo.lock b/flooder/Cargo.lock index a0c8937379..e8604a36fa 100644 --- a/flooder/Cargo.lock +++ b/flooder/Cargo.lock @@ -49,7 +49,7 @@ dependencies = [ [[package]] name = "aleph_client" -version = "2.9.1" +version = "2.10.0" dependencies = [ "anyhow", "async-trait",