diff --git a/Cargo.lock b/Cargo.lock index f7efb88..03bcbc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -257,7 +257,7 @@ dependencies = [ [[package]] name = "congruity" -version = "0.2.1" +version = "0.2.2" dependencies = [ "base58check", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index 1560370..c969a61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "congruity" -version = "0.2.1" +version = "0.2.2" description = "A Telegram bot for monitoring events on Concordium blockchain." license = "Apache-2.0" homepage = "https://github.com/crabvk/congruity" diff --git a/README.md b/README.md index 85f63f5..203a244 100644 --- a/README.md +++ b/README.md @@ -31,8 +31,6 @@ When the application starts, `.env` file is loaded from the current directory or ## TODO -* send Telegram messages in a separate task -* catch Telegram API exceptions * handle crashes in spawned tasks * write more code comments * write tests @@ -44,8 +42,8 @@ Feel free to create an issue if you found a bug, want to request a feature or ha ## Sponsoring -Work on this project is done in my free time and takes time and effort. -If you find the bot useful and want to help its depelopment please donate some CCD to 49YugmeWQGApKgpzkR7rcefA7KaoujeToXkA8umdCvJ1DFq4RN. +Work on this project is done in my free time, and takes time and effort. If you find the bot useful and want to help its depelopment please send a donation. +CCD wallet address: 49YugmeWQGApKgpzkR7rcefA7KaoujeToXkA8umdCvJ1DFq4RN ## Resources diff --git a/src/main.rs b/src/main.rs index 65ac5b8..d18b3dc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod db; mod listener; mod repl; mod rpc; +mod sender; mod states; mod transitions; mod types; @@ -17,7 +18,7 @@ use sqlx::{Pool, Postgres}; use teloxide::{ adaptors::DefaultParseMode, dispatching::update_listeners, prelude::*, types::ParseMode, }; -use tokio::sync::OnceCell; +use tokio::sync::{mpsc, OnceCell}; use utils::*; type BotType = AutoSend>; @@ -60,14 +61,18 @@ async fn run() { // Update the list of the bot commands bot.set_my_commands(command::commands()).await.unwrap(); + // Spawn Telegram messages sender + let (tx, rx) = mpsc::channel(2048); + tokio::spawn(sender::handle_messages(rx, bot.clone())); + // Find and process missing account updates - updates::process_updates_since_last_ati(&bot, pool).await; + updates::process_updates_since_last_ati(tx.clone(), pool).await; // Handle Concordium account updates via PostgreSQL pub/sub channel - tokio::spawn(updates::handle_updates(bot.clone())); + tokio::spawn(updates::handle_updates(tx)); if let Some(host) = std::env::var("TELEGRAM_WEBHOOK_HOST").ok() { - info!("Receiving updates via webhook on host: {}", host); + info!("Receiving updates via webhook: {}", host); let listener = webhook(host, token, &bot).await; repl::dialogue_repl(bot, listener).await; } else { diff --git a/src/sender.rs b/src/sender.rs new file mode 100644 index 0000000..28ef47a --- /dev/null +++ b/src/sender.rs @@ -0,0 +1,36 @@ +use crate::{redis_client, BotType}; +use log::*; +use redis::AsyncCommands; +use teloxide::prelude::*; +use tokio::sync::mpsc::Receiver; + +#[derive(Debug)] +pub struct Message { + index_id: i64, + user_ids: Vec, + text: String, +} + +impl Message { + pub fn new(index_id: i64, user_ids: Vec, text: String) -> Self { + Self { + index_id, + user_ids, + text, + } + } +} + +pub async fn handle_messages(mut rx: Receiver, bot: BotType) { + let mut conn = redis_client().await.get_async_connection().await.unwrap(); + + while let Some(msg) = rx.recv().await { + for user_id in msg.user_ids { + match bot.send_message(user_id, &msg.text).await { + Ok(_) => debug!("Message sent to Telegram ID={}", user_id), + Err(err) => error!("{}", err), + } + } + let _: () = conn.set("ati:latest", msg.index_id).await.unwrap(); + } +} diff --git a/src/updates.rs b/src/updates.rs index 15a1f18..56fc5c5 100644 --- a/src/updates.rs +++ b/src/updates.rs @@ -1,16 +1,13 @@ -use crate::{db, pg_pool, redis_client}; -use crate::{types::*, utils::*, BotType}; +use crate::{db, pg_pool, redis_client, sender::Message, types::*, utils::*}; use log::*; -use redis::{aio::Connection, AsyncCommands}; +use redis::AsyncCommands; use sqlx::{postgres::PgListener, Pool, Postgres}; use std::collections::HashMap; -use teloxide::prelude::*; +use tokio::sync::mpsc::Sender; const TX_CHANNEL: &str = "tx_channel"; -pub async fn handle_updates(bot: BotType) -> Result<(), sqlx::Error> { - let mut conn = redis_client().await.get_async_connection().await.unwrap(); - +pub async fn handle_updates(tx: Sender) -> Result<(), sqlx::Error> { let pool = pg_pool().await; let mut listener = PgListener::connect(&env("POSTGRESQL_URL")).await?; @@ -23,12 +20,12 @@ pub async fn handle_updates(bot: BotType) -> Result<(), sqlx::Error> { let update: AccountUpdate = serde_json::from_str(json).unwrap(); debug!("{:?}", update); let subscriptions = db::all_subscriptions(pool).await?; - handle_update(&bot, &mut conn, update, &subscriptions).await; + handle_update(&tx, update, &subscriptions).await; } } /// Processes account updates since last handled account transaction index. -pub async fn process_updates_since_last_ati(bot: &BotType, pool: &Pool) { +pub async fn process_updates_since_last_ati(tx: Sender, pool: &Pool) { let mut conn = redis_client().await.get_async_connection().await.unwrap(); let index_id: Option = conn.get("ati:latest").await.unwrap(); @@ -40,7 +37,7 @@ pub async fn process_updates_since_last_ati(bot: &BotType, pool: &Pool info!("Processing {} account updates", updates.len()); let subscriptions = db::all_subscriptions(pool).await.unwrap(); for update in updates { - handle_update(&bot, &mut conn, update, &subscriptions).await; + handle_update(&tx, update, &subscriptions).await; } } else { info!("No account updates found"); @@ -51,8 +48,7 @@ pub async fn process_updates_since_last_ati(bot: &BotType, pool: &Pool } async fn handle_update( - bot: &BotType, - conn: &mut Connection, + tx: &Sender, update: AccountUpdate, subscriptions: &HashMap>, ) { @@ -89,8 +85,9 @@ async fn handle_update( cost ); - notify_subscribers(&bot, msg, subscriber_ids).await; - set_index_id(conn, update.index_id).await; + tx.send(Message::new(update.index_id, subscriber_ids.to_vec(), msg)) + .await + .ok(); } } Some(Event::TransferredWithSchedule { from, to, amount }) => { @@ -105,8 +102,9 @@ async fn handle_update( cost ); - notify_subscribers(&bot, msg, subscriber_ids).await; - set_index_id(conn, update.index_id).await; + tx.send(Message::new(update.index_id, subscriber_ids.to_vec(), msg)) + .await + .ok(); } } _ => {} @@ -121,8 +119,9 @@ async fn handle_update( if let Some(reward) = reward { if let Some(subscriber_ids) = subscriptions.get(&reward.address.to_string()) { let msg = format!("Baker reward {} CCD", reward.amount,); - notify_subscribers(&bot, msg, subscriber_ids).await; - set_index_id(conn, update.index_id).await; + tx.send(Message::new(update.index_id, subscriber_ids.to_vec(), msg)) + .await + .ok(); } } } @@ -138,10 +137,6 @@ fn sender_hyperlink(sender: Option) -> String { } } -async fn set_index_id(conn: &mut Connection, index_id: i64) { - let _: () = conn.set("ati:latest", index_id).await.unwrap(); -} - fn event_for(events: Vec, address: &AccountAddress) -> Option { use Event::*; @@ -158,9 +153,3 @@ fn event_for(events: Vec, address: &AccountAddress) -> Option { } None } - -async fn notify_subscribers(bot: &BotType, message: String, subscriber_ids: &Vec) { - for sid in subscriber_ids { - bot.send_message(*sid, &message).await.ok(); - } -}