Skip to content
This repository has been archived by the owner on Jun 25, 2022. It is now read-only.

Commit

Permalink
Cache subscriptions in Redis for fast retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
crabvk committed Jan 20, 2022
1 parent 146e975 commit f34b89b
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "congruity"
version = "0.2.2"
version = "0.3.0"
description = "A Telegram bot for monitoring events on Concordium blockchain."
license = "Apache-2.0"
homepage = "https://github.com/crabvk/congruity"
Expand Down
2 changes: 1 addition & 1 deletion src/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use teloxide::utils::command::BotCommand;

#[derive(BotCommand, Debug)]
#[derive(BotCommand)]
#[command(rename = "lowercase")]
pub enum Command {
#[command(description = "off")]
Expand Down
93 changes: 68 additions & 25 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use crate::pg_pool;
use crate::types::{AccountAddress, AccountUpdate};
use crate::{pg_pool, redis_cm};
use base58check::ToBase58Check;
use futures::TryStreamExt;
use redis::{aio::ConnectionManager, AsyncCommands, RedisResult};
use sqlx::{postgres::PgRow, PgPool, Row};
use std::collections::HashMap;
use std::result::Result;
use tokio_stream::StreamExt;

/// Returns all subscriptions.
/// For each subscriber account returns a list of Telegram user IDs because many users can be subscribed to one account updates.
pub async fn all_subscriptions(pool: &PgPool) -> Result<HashMap<String, Vec<i64>>, sqlx::Error> {
let mut subscriptions = HashMap::new();
/// Preloads all subscriptions from Postgres to Redis.
/// For each subscriber account there are a set of Telegram user IDs.
/// Many users can subscribe to updates for one account.
pub async fn load_subscriptions(pool: &PgPool) -> Result<(), sqlx::Error> {
let mut cm = redis_cm().await.clone();
let account_keys: Vec<String> = cm.keys("account:*").await.unwrap();
for key in account_keys {
let _: () = cm.del(key).await.unwrap();
}

let mut rows =
sqlx::query("SELECT account, array_agg(user_id) FROM subscriptions GROUP BY account")
Expand All @@ -18,10 +23,20 @@ pub async fn all_subscriptions(pool: &PgPool) -> Result<HashMap<String, Vec<i64>
while let Some(row) = rows.try_next().await? {
let k = row.get::<&[u8], _>(0).to_base58check(1);
let v: Vec<i64> = row.get(1);
subscriptions.insert(k, v);
let key = format!("account:{}", k);
let _: () = cm.sadd(key, v).await.unwrap();
}

Ok(subscriptions)
Ok(())
}

pub async fn subscriber_ids(
cm: &mut ConnectionManager,
account: &str,
) -> RedisResult<Option<Vec<i64>>> {
let key = format!("account:{}", account);
let user_ids: Option<Vec<i64>> = cm.smembers(key).await?;
Ok(user_ids)
}

/// Returns subscriptions for a Telegram user.
Expand All @@ -37,12 +52,13 @@ pub async fn subscriptions(user_id: i64) -> Result<Vec<String>, sqlx::Error> {
Ok(subscriptions)
}

pub async fn subscribe(user_id: i64, address: &AccountAddress) -> Result<Vec<i32>, sqlx::Error> {
pub async fn subscribe(user_id: i64, address: &AccountAddress) -> Result<bool, sqlx::Error> {
let mut cm = redis_cm().await.clone();
let pool = pg_pool().await;
let ids = sqlx::query(
let ids: Vec<i64> = sqlx::query(
r#"
INSERT INTO subscriptions (user_id, account) VALUES ($1, $2)
ON CONFLICT DO NOTHING RETURNING id
ON CONFLICT DO NOTHING RETURNING user_id
"#,
)
.bind(user_id)
Expand All @@ -51,25 +67,52 @@ ON CONFLICT DO NOTHING RETURNING id
.fetch_all(pool)
.await?;

Ok(ids)
if ids.len() > 0 {
let key = format!("account:{}", address);
let _: () = cm.sadd(key, &ids).await.unwrap();
}

Ok(ids.len() > 0)
}

pub async fn unsubscribe(
user_id: i64,
address: Option<&AccountAddress>,
) -> Result<Vec<i32>, sqlx::Error> {
pub async fn unsubscribe(user_id: i64, address: &AccountAddress) -> Result<bool, sqlx::Error> {
let mut cm = redis_cm().await.clone();
let pool = pg_pool().await;

let query = if let Some(address) = address {
sqlx::query("DELETE FROM subscriptions WHERE user_id = $1 AND account = $2 RETURNING id")
let ids: Vec<i64> = sqlx::query(
"DELETE FROM subscriptions WHERE user_id = $1 AND account = $2 RETURNING user_id",
)
.bind(user_id)
.bind(address.to_bytes())
.map(|row: PgRow| row.get(0))
.fetch_all(pool)
.await?;

if ids.len() > 0 {
let key = format!("account:{}", address);
let _: () = cm.srem(key, &ids).await.unwrap();
}

Ok(ids.len() > 0)
}

pub async fn unsubscribe_all(user_id: i64) -> Result<bool, sqlx::Error> {
let mut cm = redis_cm().await.clone();
let pool = pg_pool().await;

let pairs: Vec<(i64, String)> =
sqlx::query("DELETE FROM subscriptions WHERE user_id = $1 RETURNING user_id, account")
.bind(user_id)
.bind(address.to_bytes())
} else {
sqlx::query("DELETE FROM subscriptions WHERE user_id = $1 RETURNING id").bind(user_id)
};
.map(|row: PgRow| (row.get(0), row.get::<&[u8], _>(1).to_base58check(1)))
.fetch_all(pool)
.await?;

for (user_id, account) in &pairs {
let key = format!("account:{}", account);
let _: () = cm.srem(key, user_id).await.unwrap();
}

let ids = query.map(|row: PgRow| row.get(0)).fetch_all(pool).await?;
Ok(ids)
Ok(pairs.len() > 0)
}

/// Returns account updates since account transaction index ID.
Expand Down
10 changes: 7 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn pg_pool() -> &'static Pool<Postgres> {
PgPoolOptions::new()
.max_connections(16)
.connect_lazy(&env("POSTGRESQL_URL"))
.unwrap()
.expect("Can't create PostgreSQL connection pool")
})
.await
}
Expand Down Expand Up @@ -67,18 +67,22 @@ async fn run() {
// Update the list of the bot commands
bot.set_my_commands(command::commands()).await.unwrap();

// Cache subscriptions in Redis
info!("Loading subscriptions");
db::load_subscriptions(pool).await.unwrap();

// Spawn Telegram messages sender
let (tx, rx) = mpsc::channel(2048);
tokio::spawn(sender::handle_messages(rx, bot.clone()));

// Find and process missing account updates
updates::process_updates_since_last_ati(tx.clone(), pool).await;
updates::process_updates_since_last_ati(tx.clone()).await;

// Handle Concordium account updates via PostgreSQL pub/sub channel
tokio::spawn(updates::handle_updates(tx));

if let Some(host) = std::env::var("TELEGRAM_WEBHOOK_HOST").ok() {
info!("Receiving updates via webhook: {}", host);
info!("Receiving updates via webhook on {}", host);
let listener = webhook(host, token, &bot).await;
repl::dialogue_repl(bot, listener).await;
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn handle_messages(mut rx: Receiver<Message>, bot: BotType) {
while let Some(msg) = rx.recv().await {
for user_id in msg.user_ids {
match bot.send_message(user_id, &msg.text).await {
Ok(_) => debug!("Message sent to Telegram ID={}", user_id),
Ok(_) => debug!("Message sent to Telegram ID {}", user_id),
Err(err) => error!("{}", err),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Default for Dialogue {
#[derive(Serialize, Deserialize)]
pub struct StartState;

#[derive(Serialize, PartialEq, Deserialize)]
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub enum ReceiveAddressState {
Balance,
Subscribe,
Expand Down
21 changes: 9 additions & 12 deletions src/transitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ async fn subscribe(address: &AccountAddress, cx: TransitionIn<BotType>) -> Respo
let user_id = cx.chat_id() as i64;
let result = db::subscribe(user_id, address).await;
match result {
Ok(rows) if rows.len() > 0 => cx.answer("Subscribed successfully").await,
Ok(_) => {
Ok(true) => cx.answer("Subscribed successfully").await,
Ok(false) => {
cx.answer("You're already subscribed for this address")
.await
}
Expand All @@ -30,10 +30,9 @@ async fn answer_after_keyboard(cx: TransitionIn<BotType>, text: &str) -> Respons
}

async fn unsubscribe_all(cx: TransitionIn<BotType>) -> ResponseResult<Message> {
let result = db::unsubscribe(cx.chat_id(), None).await;
match result {
Ok(rows) if rows.len() > 0 => answer_after_keyboard(cx, "Unsubscribed successfully").await,
Ok(_) => answer_after_keyboard(cx, "No subscriptions were found").await,
match db::unsubscribe_all(cx.chat_id()).await {
Ok(true) => answer_after_keyboard(cx, "Unsubscribed successfully").await,
Ok(false) => answer_after_keyboard(cx, "No subscriptions were found").await,
Err(err) => {
error!("{}", err);
answer_after_keyboard(cx, "A database query error has occurred 😐").await
Expand All @@ -45,10 +44,9 @@ async fn unsubscribe(
address: &AccountAddress,
cx: TransitionIn<BotType>,
) -> ResponseResult<Message> {
let result = db::unsubscribe(cx.chat_id(), Some(address)).await;
match result {
Ok(rows) if rows.len() > 0 => answer_after_keyboard(cx, "Unsubscribed successfully").await,
Ok(_) => answer_after_keyboard(cx, "You're not subscribed for this address").await,
match db::unsubscribe(cx.chat_id(), address).await {
Ok(true) => answer_after_keyboard(cx, "Unsubscribed successfully").await,
Ok(false) => answer_after_keyboard(cx, "You're not subscribed for this address").await,
Err(err) => {
error!("{}", err);
answer_after_keyboard(cx, "A database query error has occurred 😐").await
Expand Down Expand Up @@ -102,8 +100,6 @@ async fn start(
}
};

info!("{:?}", command);

match command {
Command::Start | Command::Help => {
cx.answer(Command::descriptions()).await?;
Expand Down Expand Up @@ -166,6 +162,7 @@ async fn recieve_balance(
}

if let Ok(address) = address.parse::<AccountAddress>() {
debug!("{:?} {}", state, address);
match state {
Balance => {
get_account_balance(&address, cx).await?;
Expand Down
41 changes: 19 additions & 22 deletions src/updates.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::{db, pg_pool, redis_cm, sender::Message, types::*, utils::*};
use crate::{db, redis_cm, sender::Message, types::*, utils::*};
use log::*;
use redis::AsyncCommands;
use sqlx::{postgres::PgListener, Pool, Postgres};
use std::collections::HashMap;
use redis::{aio::ConnectionManager, AsyncCommands};
use sqlx::postgres::PgListener;
use tokio::sync::mpsc::Sender;

const TX_CHANNEL: &str = "tx_channel";

pub async fn handle_updates(tx: Sender<Message>) -> Result<(), sqlx::Error> {
let pool = pg_pool().await;
let mut cm = redis_cm().await.clone();
let mut listener = PgListener::connect(&env("POSTGRESQL_URL")).await?;

info!("Listening channel {}", TX_CHANNEL);
Expand All @@ -19,25 +18,23 @@ pub async fn handle_updates(tx: Sender<Message>) -> Result<(), sqlx::Error> {
let json = n11.payload();
let update: AccountUpdate = serde_json::from_str(json).unwrap();
debug!("{:?}", update);
let subscriptions = db::all_subscriptions(pool).await?;
handle_update(&tx, update, &subscriptions).await;
handle_update(&tx, update, &mut cm).await;
}
}

/// Processes account updates since last handled account transaction index.
pub async fn process_updates_since_last_ati(tx: Sender<Message>, pool: &Pool<Postgres>) {
let mut conn = redis_cm().await.clone();
let index_id: Option<i64> = conn.get("ati:latest").await.unwrap();
pub async fn process_updates_since_last_ati(tx: Sender<Message>) {
let mut cm = redis_cm().await.clone();
let index_id: Option<i64> = cm.get("ati:latest").await.unwrap();

if let Some(index_id) = index_id {
info!("Last account transaction index ID={}", index_id);
info!("Last account transaction index ID {}", index_id);
let updates = db::account_updates_since(index_id).await.unwrap();

if updates.len() > 0 {
info!("Processing {} account updates", updates.len());
let subscriptions = db::all_subscriptions(pool).await.unwrap();
for update in updates {
handle_update(&tx, update, &subscriptions).await;
handle_update(&tx, update, &mut cm).await;
}
} else {
info!("No account updates found");
Expand All @@ -47,11 +44,8 @@ pub async fn process_updates_since_last_ati(tx: Sender<Message>, pool: &Pool<Pos
}
}

async fn handle_update(
tx: &Sender<Message>,
update: AccountUpdate,
subscriptions: &HashMap<String, Vec<i64>>,
) {
/// Handles update for account.
async fn handle_update(tx: &Sender<Message>, update: AccountUpdate, cm: &mut ConnectionManager) {
use TransactionType::*;

match update.summary {
Expand All @@ -74,7 +68,8 @@ async fn handle_update(
..
} => match event_for(events, &update.account) {
Some(Event::Transferred { from, to, amount }) => {
if let Some(subscriber_ids) = subscriptions.get(&to.to_string()) {
if let Some(subscriber_ids) = db::subscriber_ids(cm, &to.to_string()).await.unwrap()
{
let msg = format!(
"Transferred {} CCD from {} to {}\nTx Hash: {}\n{}Cost: {} CCD",
amount,
Expand All @@ -85,13 +80,14 @@ async fn handle_update(
cost
);

tx.send(Message::new(update.index_id, subscriber_ids.to_vec(), msg))
tx.send(Message::new(update.index_id, subscriber_ids, msg))
.await
.ok();
}
}
Some(Event::TransferredWithSchedule { from, to, amount }) => {
if let Some(subscriber_ids) = subscriptions.get(&to.to_string()) {
if let Some(subscriber_ids) = db::subscriber_ids(cm, &to.to_string()).await.unwrap()
{
let msg = format!(
"Transferred with schedule {} CCD from {} to {}\nTx Hash: {}\n{}Cost: {} CCD",
amount.total_amount(),
Expand All @@ -117,7 +113,8 @@ async fn handle_update(
.find(|r| r.address == update.account.address());

if let Some(reward) = reward {
if let Some(subscriber_ids) = subscriptions.get(&reward.address.to_string()) {
if let Some(subscriber_ids) = db::subscriber_ids(cm, &reward.address).await.unwrap()
{
let msg = format!("Baker reward {} CCD", reward.amount,);
tx.send(Message::new(update.index_id, subscriber_ids.to_vec(), msg))
.await
Expand Down

0 comments on commit f34b89b

Please sign in to comment.