Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aleph-client: Fetch contract events #877

Merged
merged 2 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aleph-client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aleph-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "aleph_client"
# TODO bump major version when API stablize
version = "2.9.1"
version = "2.10.0"
edition = "2021"
license = "Apache 2.0"

Expand Down
2 changes: 2 additions & 0 deletions aleph-client/src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ pub trait ConnectionApi: Sync {
/// Data regarding submitted transaction.
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
pub struct TxInfo {
/// Hash of the block containing tx.
pub block_hash: BlockHash,
/// Hash of the transaction itself.
pub tx_hash: TxHash,
}

Expand Down
114 changes: 86 additions & 28 deletions aleph-client/src/contract/event.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Utilities for listening for contract events.
//!
//! To use the module you will need to pass a connection, some contracts and an `UnboundedSender` to the
//! [listen_contract_events] function. You most likely want to `tokio::spawn` the resulting future, so that it runs
//! concurrently.
//! To use the listening feature you will need to pass a connection, some contracts and an
//! `UnboundedSender` to the [listen_contract_events] function. You most likely want to
//! `tokio::spawn` the resulting future, so that it runs concurrently.
//!
//! ```no_run
//! # use std::sync::Arc;
Expand Down Expand Up @@ -41,14 +41,39 @@
//! # Ok(())
//! # }
//! ```
//!
//! To use the fetching feature you will need to pass a connection, transaction coordinates and
//! a corresponding contract to the [get_contract_events] function.
//!
//! ```no_run
//! # use aleph_client::{AccountId, Connection, SignedConnection};
//! # use aleph_client::contract::ContractInstance;
//! # use aleph_client::contract::event::{get_contract_events, listen_contract_events};
//! # use anyhow::Result;
//! use futures::{channel::mpsc::unbounded, StreamExt};
//!
//! # async fn example(conn: Connection, signed_conn: SignedConnection, address: AccountId, path: &str) -> Result<()> {
//! let contract = ContractInstance::new(address, path)?;
//!
//! let tx_info = contract.contract_exec0(&signed_conn, "some_method").await?;
//!
//! println!("Received events {:?}", get_contract_events(&conn, &contract, tx_info).await);
//!
//! # Ok(())
//! # }
//! ```
obrok marked this conversation as resolved.
Show resolved Hide resolved

use std::collections::HashMap;
use std::{collections::HashMap, error::Error};

use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use contract_transcode::Value;
use futures::{channel::mpsc::UnboundedSender, StreamExt};
use subxt::events::EventDetails;

use crate::{contract::ContractInstance, AccountId, Connection};
use crate::{
api::contracts::events::ContractEmitted, connections::TxInfo, contract::ContractInstance,
utility::BlocksApi, AccountId, Connection,
};

/// Represents a single event emitted by a contract.
#[derive(Debug, Clone, Eq, PartialEq)]
Expand All @@ -61,6 +86,18 @@ pub struct ContractEvent {
pub data: HashMap<String, Value>,
}

/// Fetch all events that corresponds to the contract call identified by `tx_info`.
pub async fn get_contract_events(
conn: &Connection,
contract: &ContractInstance,
tx_info: TxInfo,
) -> Result<Vec<ContractEvent>> {
let events = conn.get_tx_events(tx_info).await?;
translate_events(events.iter(), &[contract])
.into_iter()
.collect()
}

/// Starts an event listening loop.
///
/// Will send contract event and every error encountered while fetching through the provided [UnboundedSender].
Expand All @@ -79,33 +116,54 @@ pub async fn listen_contract_events(
if sender.is_closed() {
break;
}
let events = block?.events().await?;
for event in translate_events(events.iter(), contracts) {
sender.unbounded_send(event)?;
}
}

let block = block?;
Ok(())
}

for event in block.events().await?.iter() {
let event = event?;
/// Try to convert `events` to `ContractEvent` using matching contract from `contracts`.
fn translate_events<
Err: Error + Into<anyhow::Error> + Send + Sync + 'static,
E: Iterator<Item = Result<EventDetails, Err>>,
>(
events: E,
contracts: &[&ContractInstance],
) -> Vec<Result<ContractEvent>> {
events
.filter_map(|maybe_event| {
maybe_event
.map(|e| e.as_event::<ContractEmitted>().ok().flatten())
.transpose()
})
.map(|maybe_event| match maybe_event {
Ok(e) => translate_event(&e, contracts),
Err(e) => Err(anyhow::Error::from(e)),
})
.collect()
}

if let Some(event) =
event.as_event::<crate::api::contracts::events::ContractEmitted>()?
{
if let Some(contract) = contracts
.iter()
.find(|contract| contract.address() == &event.contract)
{
let data = zero_prefixed(&event.data);
let event = contract
.transcoder
.decode_contract_event(&mut data.as_slice());
/// Try to convert `event` to `ContractEvent` using matching contract from `contracts`.
fn translate_event(
event: &ContractEmitted,
contracts: &[&ContractInstance],
) -> Result<ContractEvent> {
let matching_contract = contracts
.iter()
.find(|contract| contract.address() == &event.contract)
.ok_or(anyhow!(
"The event wasn't emitted by any of the provided contracts"
))?;

sender.unbounded_send(
event.and_then(|event| build_event(contract.address().clone(), event)),
)?;
}
}
}
}
let data = zero_prefixed(&event.data);
let data = matching_contract
.transcoder
.decode_contract_event(&mut data.as_slice())?;

Ok(())
build_event(matching_contract.address.clone(), data)
}

/// The contract transcoder assumes there is an extra byte (that it discards) indicating the size of the data. However,
Expand Down
14 changes: 7 additions & 7 deletions aleph-client/src/contract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! ```no_run
//! # use anyhow::{Result, Context};
//! # use aleph_client::{AccountId, Balance};
//! # use aleph_client::{Connection, SignedConnection};
//! # use aleph_client::{Connection, SignedConnection, TxInfo};
//! # use aleph_client::contract::ContractInstance;
//! #
//! #[derive(Debug)]
Expand All @@ -24,7 +24,7 @@
//! })
//! }
//!
//! async fn transfer(&self, conn: &SignedConnection, to: AccountId, amount: Balance) -> Result<()> {
//! async fn transfer(&self, conn: &SignedConnection, to: AccountId, amount: Balance) -> Result<TxInfo> {
//! self.contract.contract_exec(
//! conn,
//! "PSP22::transfer",
Expand Down Expand Up @@ -52,6 +52,7 @@ use contract_transcode::ContractMessageTranscoder;
pub use convertible_value::ConvertibleValue;

use crate::{
connections::TxInfo,
contract_transcode::Value,
pallets::contract::{ContractCallArgs, ContractRpc, ContractsUserApi},
sp_weights::weight_v2::Weight,
Expand Down Expand Up @@ -128,7 +129,7 @@ impl ContractInstance {
&self,
conn: &C,
message: &str,
) -> Result<()> {
) -> Result<TxInfo> {
self.contract_exec::<C, String>(conn, message, &[]).await
}

Expand All @@ -138,7 +139,7 @@ impl ContractInstance {
conn: &C,
message: &str,
args: &[S],
) -> Result<()> {
) -> Result<TxInfo> {
self.contract_exec_value::<C, S>(conn, message, args, 0)
.await
}
Expand All @@ -149,7 +150,7 @@ impl ContractInstance {
conn: &C,
message: &str,
value: Balance,
) -> Result<()> {
) -> Result<TxInfo> {
self.contract_exec_value::<C, String>(conn, message, &[], value)
.await
}
Expand All @@ -161,7 +162,7 @@ impl ContractInstance {
message: &str,
args: &[S],
value: Balance,
) -> Result<()> {
) -> Result<TxInfo> {
let data = self.encode(message, args)?;
conn.call(
self.address.clone(),
Expand All @@ -175,7 +176,6 @@ impl ContractInstance {
TxStatus::InBlock,
)
.await
.map(|_| ())
}

fn encode<S: AsRef<str> + Debug>(&self, message: &str, args: &[S]) -> Result<Vec<u8>> {
Expand Down
2 changes: 1 addition & 1 deletion aleph-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub type SubxtClient = OnlineClient<AlephConfig>;

pub use connections::{
AsConnection, AsSigned, Connection, ConnectionApi, RootConnection, SignedConnection,
SignedConnectionApi, SudoCall,
SignedConnectionApi, SudoCall, TxInfo,
};

/// When submitting a transaction, wait for given status before proceeding.
Expand Down
2 changes: 1 addition & 1 deletion benches/payout-stakers/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/cliain/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e-tests/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 15 additions & 23 deletions e2e-tests/src/test/adder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::{fmt::Debug, str::FromStr, sync::Arc};
use std::{fmt::Debug, str::FromStr};

use aleph_client::{
contract::{event::listen_contract_events, ContractInstance},
contract::{event::get_contract_events, ContractInstance},
contract_transcode::Value,
AccountId, ConnectionApi, SignedConnectionApi,
AccountId, ConnectionApi, SignedConnectionApi, TxInfo,
};
use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use assert2::assert;
use futures::{channel::mpsc::unbounded, StreamExt};

use crate::{config::setup_test, test::helpers::basic_test_context};

Expand All @@ -19,26 +18,22 @@ pub async fn adder() -> Result<()> {

let (conn, _authority, account) = basic_test_context(config).await?;

let contract = Arc::new(AdderInstance::new(
let contract = AdderInstance::new(
&config.test_case_params.adder,
&config.test_case_params.adder_metadata,
)?);

let listen_conn = conn.clone();
let listen_contract = contract.clone();
let (tx, mut rx) = unbounded();
let listen = || async move {
listen_contract_events(&listen_conn, &[listen_contract.as_ref().into()], tx).await?;
<Result<(), anyhow::Error>>::Ok(())
};
let join = tokio::spawn(listen());
obrok marked this conversation as resolved.
Show resolved Hide resolved
)?;

let increment = 10;
let before = contract.get(&conn).await?;

contract.add(&account.sign(&conn), increment).await?;
let tx_info = contract.add(&account.sign(&conn), increment).await?;
let events = get_contract_events(&conn, &contract.contract, tx_info).await?;
let event = match events.len() {
0 => return Err(anyhow!("No event received")),
1 => events[0].clone(),
_ => return Err(anyhow!("Too many events received")),
};
pmikolajczyk41 marked this conversation as resolved.
Show resolved Hide resolved

let event = rx.next().await.context("No event received")??;
assert!(event.name == Some("ValueChanged".to_string()));
assert!(event.contract == *contract.contract.address());
assert!(event.data["new_value"] == Value::UInt(before as u128 + 10));
Expand All @@ -54,9 +49,6 @@ pub async fn adder() -> Result<()> {
.await?;
assert!(contract.get_name(&conn).await? == Some(new_name.to_string()));

rx.close();
join.await??;

Ok(())
}

Expand Down Expand Up @@ -95,7 +87,7 @@ impl AdderInstance {
self.contract.contract_read0(conn, "get").await
}

pub async fn add<S: SignedConnectionApi>(&self, conn: &S, value: u32) -> Result<()> {
pub async fn add<S: SignedConnectionApi>(&self, conn: &S, value: u32) -> Result<TxInfo> {
self.contract
.contract_exec(conn, "add", &[value.to_string()])
.await
Expand All @@ -105,7 +97,7 @@ impl AdderInstance {
&self,
conn: &S,
name: Option<&str>,
) -> Result<()> {
) -> Result<TxInfo> {
let name = name.map_or_else(
|| "None".to_string(),
|name| {
Expand Down
2 changes: 1 addition & 1 deletion flooder/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.