Skip to content
This repository has been archived by the owner on Jun 25, 2022. It is now read-only.

Commit

Permalink
Send Telegram messages in a separate task
Browse files Browse the repository at this point in the history
  • Loading branch information
crabvk committed Jan 19, 2022
1 parent 403c48e commit 42699e0
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "congruity"
version = "0.2.1"
version = "0.2.2"
description = "A Telegram bot for monitoring events on Concordium blockchain."
license = "Apache-2.0"
homepage = "https://github.com/crabvk/congruity"
Expand Down
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ When the application starts, `.env` file is loaded from the current directory or

## TODO

* send Telegram messages in a separate task
* catch Telegram API exceptions
* handle crashes in spawned tasks
* write more code comments
* write tests
Expand All @@ -44,8 +42,8 @@ Feel free to create an issue if you found a bug, want to request a feature or ha

## Sponsoring

Work on this project is done in my free time and takes time and effort.
If you find the bot useful and want to help its depelopment please donate some CCD to 49YugmeWQGApKgpzkR7rcefA7KaoujeToXkA8umdCvJ1DFq4RN.
Work on this project is done in my free time, and takes time and effort. If you find the bot useful and want to help its depelopment please send a donation.
CCD wallet address: 49YugmeWQGApKgpzkR7rcefA7KaoujeToXkA8umdCvJ1DFq4RN

## Resources

Expand Down
13 changes: 9 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod db;
mod listener;
mod repl;
mod rpc;
mod sender;
mod states;
mod transitions;
mod types;
Expand All @@ -17,7 +18,7 @@ use sqlx::{Pool, Postgres};
use teloxide::{
adaptors::DefaultParseMode, dispatching::update_listeners, prelude::*, types::ParseMode,
};
use tokio::sync::OnceCell;
use tokio::sync::{mpsc, OnceCell};
use utils::*;

type BotType = AutoSend<DefaultParseMode<Bot>>;
Expand Down Expand Up @@ -60,14 +61,18 @@ async fn run() {
// Update the list of the bot commands
bot.set_my_commands(command::commands()).await.unwrap();

// Spawn Telegram messages sender
let (tx, rx) = mpsc::channel(2048);
tokio::spawn(sender::handle_messages(rx, bot.clone()));

// Find and process missing account updates
updates::process_updates_since_last_ati(&bot, pool).await;
updates::process_updates_since_last_ati(tx.clone(), pool).await;

// Handle Concordium account updates via PostgreSQL pub/sub channel
tokio::spawn(updates::handle_updates(bot.clone()));
tokio::spawn(updates::handle_updates(tx));

if let Some(host) = std::env::var("TELEGRAM_WEBHOOK_HOST").ok() {
info!("Receiving updates via webhook on host: {}", host);
info!("Receiving updates via webhook: {}", host);
let listener = webhook(host, token, &bot).await;
repl::dialogue_repl(bot, listener).await;
} else {
Expand Down
36 changes: 36 additions & 0 deletions src/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::{redis_client, BotType};
use log::*;
use redis::AsyncCommands;
use teloxide::prelude::*;
use tokio::sync::mpsc::Receiver;

#[derive(Debug)]
pub struct Message {
index_id: i64,
user_ids: Vec<i64>,
text: String,
}

impl Message {
pub fn new(index_id: i64, user_ids: Vec<i64>, text: String) -> Self {
Self {
index_id,
user_ids,
text,
}
}
}

pub async fn handle_messages(mut rx: Receiver<Message>, bot: BotType) {
let mut conn = redis_client().await.get_async_connection().await.unwrap();

while let Some(msg) = rx.recv().await {
for user_id in msg.user_ids {
match bot.send_message(user_id, &msg.text).await {
Ok(_) => debug!("Message sent to Telegram ID={}", user_id),
Err(err) => error!("{}", err),
}
}
let _: () = conn.set("ati:latest", msg.index_id).await.unwrap();
}
}
45 changes: 17 additions & 28 deletions src/updates.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use crate::{db, pg_pool, redis_client};
use crate::{types::*, utils::*, BotType};
use crate::{db, pg_pool, redis_client, sender::Message, types::*, utils::*};
use log::*;
use redis::{aio::Connection, AsyncCommands};
use redis::AsyncCommands;
use sqlx::{postgres::PgListener, Pool, Postgres};
use std::collections::HashMap;
use teloxide::prelude::*;
use tokio::sync::mpsc::Sender;

const TX_CHANNEL: &str = "tx_channel";

pub async fn handle_updates(bot: BotType) -> Result<(), sqlx::Error> {
let mut conn = redis_client().await.get_async_connection().await.unwrap();

pub async fn handle_updates(tx: Sender<Message>) -> Result<(), sqlx::Error> {
let pool = pg_pool().await;
let mut listener = PgListener::connect(&env("POSTGRESQL_URL")).await?;

Expand All @@ -23,12 +20,12 @@ pub async fn handle_updates(bot: BotType) -> Result<(), sqlx::Error> {
let update: AccountUpdate = serde_json::from_str(json).unwrap();
debug!("{:?}", update);
let subscriptions = db::all_subscriptions(pool).await?;
handle_update(&bot, &mut conn, update, &subscriptions).await;
handle_update(&tx, update, &subscriptions).await;
}
}

/// Processes account updates since last handled account transaction index.
pub async fn process_updates_since_last_ati(bot: &BotType, pool: &Pool<Postgres>) {
pub async fn process_updates_since_last_ati(tx: Sender<Message>, pool: &Pool<Postgres>) {
let mut conn = redis_client().await.get_async_connection().await.unwrap();
let index_id: Option<i64> = conn.get("ati:latest").await.unwrap();

Expand All @@ -40,7 +37,7 @@ pub async fn process_updates_since_last_ati(bot: &BotType, pool: &Pool<Postgres>
info!("Processing {} account updates", updates.len());
let subscriptions = db::all_subscriptions(pool).await.unwrap();
for update in updates {
handle_update(&bot, &mut conn, update, &subscriptions).await;
handle_update(&tx, update, &subscriptions).await;
}
} else {
info!("No account updates found");
Expand All @@ -51,8 +48,7 @@ pub async fn process_updates_since_last_ati(bot: &BotType, pool: &Pool<Postgres>
}

async fn handle_update(
bot: &BotType,
conn: &mut Connection,
tx: &Sender<Message>,
update: AccountUpdate,
subscriptions: &HashMap<String, Vec<i64>>,
) {
Expand Down Expand Up @@ -89,8 +85,9 @@ async fn handle_update(
cost
);

notify_subscribers(&bot, msg, subscriber_ids).await;
set_index_id(conn, update.index_id).await;
tx.send(Message::new(update.index_id, subscriber_ids.to_vec(), msg))
.await
.ok();
}
}
Some(Event::TransferredWithSchedule { from, to, amount }) => {
Expand All @@ -105,8 +102,9 @@ async fn handle_update(
cost
);

notify_subscribers(&bot, msg, subscriber_ids).await;
set_index_id(conn, update.index_id).await;
tx.send(Message::new(update.index_id, subscriber_ids.to_vec(), msg))
.await
.ok();
}
}
_ => {}
Expand All @@ -121,8 +119,9 @@ async fn handle_update(
if let Some(reward) = reward {
if let Some(subscriber_ids) = subscriptions.get(&reward.address.to_string()) {
let msg = format!("Baker reward {} CCD", reward.amount,);
notify_subscribers(&bot, msg, subscriber_ids).await;
set_index_id(conn, update.index_id).await;
tx.send(Message::new(update.index_id, subscriber_ids.to_vec(), msg))
.await
.ok();
}
}
}
Expand All @@ -138,10 +137,6 @@ fn sender_hyperlink(sender: Option<AccountAddress>) -> String {
}
}

async fn set_index_id(conn: &mut Connection, index_id: i64) {
let _: () = conn.set("ati:latest", index_id).await.unwrap();
}

fn event_for(events: Vec<Event>, address: &AccountAddress) -> Option<Event> {
use Event::*;

Expand All @@ -158,9 +153,3 @@ fn event_for(events: Vec<Event>, address: &AccountAddress) -> Option<Event> {
}
None
}

async fn notify_subscribers(bot: &BotType, message: String, subscriber_ids: &Vec<i64>) {
for sid in subscriber_ids {
bot.send_message(*sid, &message).await.ok();
}
}

0 comments on commit 42699e0

Please sign in to comment.