Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aleph-client: Fetch contract events #877

Merged
merged 2 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aleph-client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aleph-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "aleph_client"
# TODO bump major version when API stablize
version = "2.9.1"
version = "2.10.0"
edition = "2021"
license = "Apache 2.0"

Expand Down
2 changes: 2 additions & 0 deletions aleph-client/src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ pub trait ConnectionApi: Sync {
/// Data regarding submitted transaction.
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
pub struct TxInfo {
/// Hash of the block containing tx.
pub block_hash: BlockHash,
/// Hash of the transaction itself.
pub tx_hash: TxHash,
}

Expand Down
201 changes: 132 additions & 69 deletions aleph-client/src/contract/event.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,25 @@
//! Utilities for listening for contract events.
//! This module provides utilities corresponding to the events emitted by a contract.
//!
//! To use the module you will need to pass a connection, some contracts and an `UnboundedSender` to the
//! [listen_contract_events] function. You most likely want to `tokio::spawn` the resulting future, so that it runs
//! concurrently.
//!
//! ```no_run
//! # use std::sync::Arc;
//! # use std::sync::mpsc::channel;
//! # use std::time::Duration;
//! # use aleph_client::{AccountId, Connection, SignedConnection};
//! # use aleph_client::contract::ContractInstance;
//! # use aleph_client::contract::event::{listen_contract_events};
//! # use anyhow::Result;
//! use futures::{channel::mpsc::unbounded, StreamExt};
//!
//! # async fn example(conn: Connection, signed_conn: SignedConnection, address1: AccountId, address2: AccountId, path1: &str, path2: &str) -> Result<()> {
//! // The `Arc` makes it possible to pass a reference to the contract to another thread
//! let contract1 = Arc::new(ContractInstance::new(address1, path1)?);
//! let contract2 = Arc::new(ContractInstance::new(address2, path2)?);
//!
//! let conn_copy = conn.clone();
//! let contract1_copy = contract1.clone();
//! let contract2_copy = contract2.clone();
//!
//! let (tx, mut rx) = unbounded();
//! let listen = || async move {
//! listen_contract_events(&conn, &[contract1_copy.as_ref(), contract2_copy.as_ref()], tx).await?;
//! <Result<(), anyhow::Error>>::Ok(())
//! };
//! let join = tokio::spawn(listen());
//!
//! contract1.contract_exec0(&signed_conn, "some_method").await?;
//! contract2.contract_exec0(&signed_conn, "some_other_method").await?;
//!
//! println!("Received event {:?}", rx.next().await);
//!
//! rx.close();
//! join.await??;
//!
//! # Ok(())
//! # }
//! ```
//! There are two ways that you can get contract events:
//! 1. By fetching events corresponding to a particular transaction. For this, you will need to
//! provide a connection, contract instance and transaction coordinate to [get_contract_events]
//! function. Similarly to [crate::utility::BlocksApi::get_tx_events], it will fetch block
//! events, filter them and decode all relevant ones.
//! 2. By listening to all contract events. For this, you will need to provide a connection, some
//! contracts and an `UnboundedSender` to the [listen_contract_events] function. In a loop,
//! it will inspect every finalized block and look for contract events.

use std::collections::HashMap;
use std::{collections::HashMap, error::Error};

use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use contract_transcode::Value;
use futures::{channel::mpsc::UnboundedSender, StreamExt};
use subxt::events::EventDetails;

use crate::{contract::ContractInstance, AccountId, Connection};
use crate::{
api::contracts::events::ContractEmitted, connections::TxInfo, contract::ContractInstance,
utility::BlocksApi, AccountId, Connection,
};

/// Represents a single event emitted by a contract.
#[derive(Debug, Clone, Eq, PartialEq)]
Expand All @@ -61,13 +32,84 @@ pub struct ContractEvent {
pub data: HashMap<String, Value>,
}

/// Starts an event listening loop.
/// Fetch and decode all events that correspond to the call identified by `tx_info` made to
/// `contract`.
///
/// ```no_run
/// # use aleph_client::{AccountId, Connection, SignedConnection};
/// # use aleph_client::contract::ContractInstance;
/// # use aleph_client::contract::event::{get_contract_events, listen_contract_events};
/// # use anyhow::Result;
/// use futures::{channel::mpsc::unbounded, StreamExt};
///
/// # async fn example(conn: Connection, signed_conn: SignedConnection, address: AccountId, path: &str) -> Result<()> {
/// let contract = ContractInstance::new(address, path)?;
///
/// let tx_info = contract.contract_exec0(&signed_conn, "some_method").await?;
///
/// println!("Received events {:?}", get_contract_events(&conn, &contract, tx_info).await);
///
/// # Ok(())
/// # }
/// ```
pub async fn get_contract_events(
conn: &Connection,
contract: &ContractInstance,
tx_info: TxInfo,
) -> Result<Vec<ContractEvent>> {
let events = conn.get_tx_events(tx_info).await?;
translate_events(events.iter(), &[contract])
.into_iter()
.collect()
}

/// Starts an event listening loop. Will send contract event and every error encountered while
/// fetching through the provided [UnboundedSender].
///
/// Will send contract event and every error encountered while fetching through the provided [UnboundedSender].
/// Only events coming from the address of one of the `contracts` will be decoded.
///
/// The loop will terminate once `sender` is closed. The loop may also terminate in case of errors while fetching blocks
/// or decoding events (pallet events, contract event decoding errors are sent over the channel).
///
/// You most likely want to `tokio::spawn` the resulting future, so that it runs concurrently.
///
/// ```no_run
/// # use std::sync::Arc;
/// # use std::sync::mpsc::channel;
/// # use std::time::Duration;
/// # use aleph_client::{AccountId, Connection, SignedConnection};
/// # use aleph_client::contract::ContractInstance;
/// # use aleph_client::contract::event::{listen_contract_events};
/// # use anyhow::Result;
/// use futures::{channel::mpsc::unbounded, StreamExt};
///
/// # async fn example(conn: Connection, signed_conn: SignedConnection, address1: AccountId, address2: AccountId, path1: &str, path2: &str) -> Result<()> {
/// // The `Arc` makes it possible to pass a reference to the contract to another thread
/// let contract1 = Arc::new(ContractInstance::new(address1, path1)?);
/// let contract2 = Arc::new(ContractInstance::new(address2, path2)?);
///
/// let conn_copy = conn.clone();
/// let contract1_copy = contract1.clone();
/// let contract2_copy = contract2.clone();
///
/// let (tx, mut rx) = unbounded();
/// let listen = || async move {
/// listen_contract_events(&conn, &[contract1_copy.as_ref(), contract2_copy.as_ref()], tx).await?;
/// <Result<(), anyhow::Error>>::Ok(())
/// };
/// let join = tokio::spawn(listen());
///
/// contract1.contract_exec0(&signed_conn, "some_method").await?;
/// contract2.contract_exec0(&signed_conn, "some_other_method").await?;
///
/// println!("Received event {:?}", rx.next().await);
///
/// rx.close();
/// join.await??;
///
/// # Ok(())
/// # }
/// ```
pub async fn listen_contract_events(
conn: &Connection,
contracts: &[&ContractInstance],
Expand All @@ -79,33 +121,54 @@ pub async fn listen_contract_events(
if sender.is_closed() {
break;
}
let events = block?.events().await?;
for event in translate_events(events.iter(), contracts) {
sender.unbounded_send(event)?;
}
}

let block = block?;
Ok(())
}

for event in block.events().await?.iter() {
let event = event?;
/// Try to convert `events` to `ContractEvent` using matching contract from `contracts`.
fn translate_events<
Err: Error + Into<anyhow::Error> + Send + Sync + 'static,
E: Iterator<Item = Result<EventDetails, Err>>,
>(
events: E,
contracts: &[&ContractInstance],
) -> Vec<Result<ContractEvent>> {
events
.filter_map(|maybe_event| {
maybe_event
.map(|e| e.as_event::<ContractEmitted>().ok().flatten())
.transpose()
})
.map(|maybe_event| match maybe_event {
Ok(e) => translate_event(&e, contracts),
Err(e) => Err(anyhow::Error::from(e)),
})
.collect()
}

if let Some(event) =
event.as_event::<crate::api::contracts::events::ContractEmitted>()?
{
if let Some(contract) = contracts
.iter()
.find(|contract| contract.address() == &event.contract)
{
let data = zero_prefixed(&event.data);
let event = contract
.transcoder
.decode_contract_event(&mut data.as_slice());
/// Try to convert `event` to `ContractEvent` using matching contract from `contracts`.
fn translate_event(
event: &ContractEmitted,
contracts: &[&ContractInstance],
) -> Result<ContractEvent> {
let matching_contract = contracts
.iter()
.find(|contract| contract.address() == &event.contract)
.ok_or(anyhow!(
"The event wasn't emitted by any of the provided contracts"
))?;

sender.unbounded_send(
event.and_then(|event| build_event(contract.address().clone(), event)),
)?;
}
}
}
}
let data = zero_prefixed(&event.data);
let data = matching_contract
.transcoder
.decode_contract_event(&mut data.as_slice())?;

Ok(())
build_event(matching_contract.address.clone(), data)
}

/// The contract transcoder assumes there is an extra byte (that it discards) indicating the size of the data. However,
Expand Down
14 changes: 7 additions & 7 deletions aleph-client/src/contract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! ```no_run
//! # use anyhow::{Result, Context};
//! # use aleph_client::{AccountId, Balance};
//! # use aleph_client::{Connection, SignedConnection};
//! # use aleph_client::{Connection, SignedConnection, TxInfo};
//! # use aleph_client::contract::ContractInstance;
//! #
//! #[derive(Debug)]
Expand All @@ -24,7 +24,7 @@
//! })
//! }
//!
//! async fn transfer(&self, conn: &SignedConnection, to: AccountId, amount: Balance) -> Result<()> {
//! async fn transfer(&self, conn: &SignedConnection, to: AccountId, amount: Balance) -> Result<TxInfo> {
//! self.contract.contract_exec(
//! conn,
//! "PSP22::transfer",
Expand Down Expand Up @@ -52,6 +52,7 @@ use contract_transcode::ContractMessageTranscoder;
pub use convertible_value::ConvertibleValue;

use crate::{
connections::TxInfo,
contract_transcode::Value,
pallets::contract::{ContractCallArgs, ContractRpc, ContractsUserApi},
sp_weights::weight_v2::Weight,
Expand Down Expand Up @@ -128,7 +129,7 @@ impl ContractInstance {
&self,
conn: &C,
message: &str,
) -> Result<()> {
) -> Result<TxInfo> {
self.contract_exec::<C, String>(conn, message, &[]).await
}

Expand All @@ -138,7 +139,7 @@ impl ContractInstance {
conn: &C,
message: &str,
args: &[S],
) -> Result<()> {
) -> Result<TxInfo> {
self.contract_exec_value::<C, S>(conn, message, args, 0)
.await
}
Expand All @@ -149,7 +150,7 @@ impl ContractInstance {
conn: &C,
message: &str,
value: Balance,
) -> Result<()> {
) -> Result<TxInfo> {
self.contract_exec_value::<C, String>(conn, message, &[], value)
.await
}
Expand All @@ -161,7 +162,7 @@ impl ContractInstance {
message: &str,
args: &[S],
value: Balance,
) -> Result<()> {
) -> Result<TxInfo> {
let data = self.encode(message, args)?;
conn.call(
self.address.clone(),
Expand All @@ -175,7 +176,6 @@ impl ContractInstance {
TxStatus::InBlock,
)
.await
.map(|_| ())
}

fn encode<S: AsRef<str> + Debug>(&self, message: &str, args: &[S]) -> Result<Vec<u8>> {
Expand Down
2 changes: 1 addition & 1 deletion aleph-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub type SubxtClient = OnlineClient<AlephConfig>;

pub use connections::{
AsConnection, AsSigned, Connection, ConnectionApi, RootConnection, SignedConnection,
SignedConnectionApi, SudoCall,
SignedConnectionApi, SudoCall, TxInfo,
};

/// When submitting a transaction, wait for given status before proceeding.
Expand Down
2 changes: 1 addition & 1 deletion benches/payout-stakers/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/cliain/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e-tests/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading