Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/optimize redis stuffs #14

Merged
merged 14 commits into from
Feb 2, 2023
379 changes: 378 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,14 @@ clap = { version = "4.0.30", features = ["derive"] }
[[example]]
name = "redis_zscore"
path = "examples/redis_zscore.rs"

[[example]]
name = "redis_zset"
path = "examples/redis_zset.rs"

[dev-dependencies]
criterion = "0.3"

[[bench]]
name = "my_benchmark"
harness = false
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@ static:
cargo build --target=x86_64-unknown-linux-musl --release

test-announce:
curl -v "localhost:6969/announce?info_hash=AAAAAAAAAAAAAAAAAAAB&port=3333&left=0"
curl -v "localhost:6969/announce?info_hash=AAAAAAAAAAAAAAAAAAAB&port=3333&left=0"

# kiryuu & redis should be running
gauge:
docker run -e KIRYUU_HOST=http://172.17.0.1:6969 -e REDIS_HOST=redis://172.17.0.1:6379 ghcr.io/ckcr4lyf/kiryuu-gauge:master
3 changes: 3 additions & 0 deletions benches/bruvva.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub fn xd() {
let x = 1;
}
19 changes: 19 additions & 0 deletions benches/my_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
mod bruvva;

#[inline]
fn fibonacci(n: u64) -> u64 {
match n {
0 => 1,
1 => 1,
n => fibonacci(n-1) + fibonacci(n-2),
}
}

use criterion::{black_box, criterion_group, criterion_main, Criterion};

pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("fib 20", |b| b.iter(|| fibonacci(black_box(20))));
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
5 changes: 4 additions & 1 deletion examples/redis_zscore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ fn main(){
_ => true,
};

println!("exist_highlevel is {:?}, exist_lowlevel is {:?}", exist_highlevel, exist_lowlevel)
println!("exist_highlevel is {:?}, exist_lowlevel is {:?}", exist_highlevel, exist_lowlevel);

// let exist_bool: bool = r_client.zscore("thezset", "KEY2").unwrap();
// println!("bool coerced is {:?}", exist_bool);
}
79 changes: 79 additions & 0 deletions examples/redis_zset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use redis::{self, Commands};

#[derive(Debug)]
enum Exists {
Yes,
No,
}

impl redis::FromRedisValue for Exists {
fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Exists> {
match *v {
redis::Value::Nil => Ok(Exists::No),
_ => Ok(Exists::Yes),
}
}
}

fn main(){
let mut r_client = redis::Client::open("redis://127.0.0.1:6379").unwrap();

// Work with low val
let zrange_result = match r_client.zrangebyscore::<_, _, _, redis::Value>("MYKEY", "0", "101").unwrap() {
redis::Value::Bulk(v1) => v1,
_ => vec![]
};

let mut seeders = [0_u8; 50 * 6]; // We will store max 50 guys
let mut pos = 0;

for element in &zrange_result {
if let redis::Value::Data(xd) = element {
for i in 0..6 {
seeders[pos + i] = xd[i];
}
pos += 6;
}

if pos >= 300 {
break;
}
}

println!("Manual seeders are {:?}", seeders);


let dummy: Vec<u8> = vec![0u8; 1];
let mut seeders_v2: [&Vec<u8>; 50] = [&dummy; 50];

pos = 0;

for element in &zrange_result {
if let redis::Value::Data(xd) = element {
seeders_v2[pos] = xd;
pos += 1;
}
}

println!("Manual seeders 2 are {:?}", seeders_v2);



// let mut body: [u8; 10] = [0; 10];
// if let redis::Value::Data(xd) = zrange_result.get(0).unwrap() {
// body[0] = xd[0];
// }

// // I Know I have a Vec<redis::Value> now
// println!("zrange_result is {:?}", zrange_result);
// println!("body is {:?}", body);

let og: Vec<Vec<u8>> = r_client.zrangebyscore("MYKEY", "0", "101").unwrap();
println!("og is {:?}", og)
}

/* Manual testing:
FLUSHDB
ZADD MYKEY 10 ABCDEF
ZADD MYKEY 10 XXXXXX
*/
1 change: 1 addition & 0 deletions src/constants/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub const ANNOUNCE_COUNT_KEY: &str = "kiryuu_http_announce_count";
pub const NOCHANGE_ANNOUNCE_COUNT_KEY: &str = "kiryuu_http_nochange_announce_count"; // If no change to seeder_count / leecher_count
pub const CACHE_HIT_ANNOUNCE_COUNT_KEY: &str = "kiryuu_http_cache_hit_announce_count";
pub const REQ_DURATION_KEY: &str = "kiryuu_http_req_seconds_sum";
pub const TORRENTS_KEY: &str = "TORRENTS";
136 changes: 74 additions & 62 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ mod constants;
mod req_log;

use actix_web::{get, App, HttpServer, web, HttpRequest, HttpResponse, http::header, http::StatusCode};
use rand::{thread_rng, prelude::SliceRandom, Rng};
use std::time::{SystemTime, UNIX_EPOCH};
use clap::Parser;

Expand All @@ -29,6 +28,21 @@ struct Args {
// So dont waste bandwidth on redis query etc.
const THIRTY_ONE_MINUTES: i64 = 60 * 31 * 1000;

#[derive(Debug)]
enum Exists {
Yes,
No,
}

impl redis::FromRedisValue for Exists {
fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Exists> {
match *v {
redis::Value::Nil => Ok(Exists::No),
_ => Ok(Exists::Yes),
}
}
}

#[get("/announce")]
async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {

Expand Down Expand Up @@ -64,26 +78,15 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {

// Get seeders & leechers
let mut rc = data.redis_connection.clone();

let seeders_key = parsed.info_hash.clone() + "_seeders";
let leechers_key = parsed.info_hash.clone() + "_leechers";
let cache_key = parsed.info_hash.clone() + "_cache";

let (seeders, mut leechers) : (Vec<Vec<u8>>, Vec<Vec<u8>>) = redis::pipe()
.cmd("ZRANGEBYSCORE").arg(&seeders_key).arg(max_limit).arg(time_now_ms)
.cmd("ZRANGEBYSCORE").arg(&leechers_key).arg(max_limit).arg(time_now_ms)
.query_async(&mut rc).await.unwrap();

let is_seeder = seeders.contains(&parsed.ip_port);

let is_leecher = match is_seeder {
true => false, // If it's a seeder, leecher must be false
false => leechers.contains(&parsed.ip_port), // otherwise we will search the leecher vector as well
};

// Don't shuffle the seeders, for leechers shuffle so that the older ones also get a shot
// e.g. if there are 1000 leechers, the one whom announced 29 minutes ago also has a fair
// change of being announced to a peer, to help swarm
leechers.shuffle(&mut thread_rng());
let (is_seeder_v2, is_leecher_v2, cached_reply) : (Exists, Exists, Vec<u8>) = redis::pipe()
.cmd("ZSCORE").arg(&seeders_key).arg(&parsed.ip_port)
.cmd("ZSCORE").arg(&leechers_key).arg(&parsed.ip_port)
.cmd("GET").arg(&cache_key)
.query_async(&mut rc).await.unwrap();

let mut post_announce_pipeline = redis::pipe();
post_announce_pipeline.cmd("ZADD").arg(constants::TORRENTS_KEY).arg(time_now_ms).arg(&parsed.info_hash).ignore(); // To "update" the torrent
Expand All @@ -97,67 +100,88 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {
None => "unknown",
};

// let bg_rc = data.redis_connection.clone();

if event == "stopped" {
if is_seeder {
if let Exists::Yes = is_seeder_v2 {
seed_count_mod -= 1;
post_announce_pipeline.cmd("ZREM").arg(&seeders_key).arg(&parsed.ip_port).ignore(); // We dont care about the return value
} else if is_leecher {
} else if let Exists::Yes = is_leecher_v2 {
leech_count_mod -= 1;
post_announce_pipeline.cmd("ZREM").arg(&leechers_key).arg(&parsed.ip_port).ignore(); // We dont care about the return value
}
} else if parsed.is_seeding {

// New seeder
if is_seeder == false {
if let Exists::No = is_seeder_v2 {
post_announce_pipeline.cmd("ZADD").arg(&seeders_key).arg(time_now_ms).arg(&parsed.ip_port).ignore();
seed_count_mod += 1
seed_count_mod += 1;
}

// They just completed
if event == "completed" {
// If they were previously leecher, remove from that pool
if is_leecher {
if let Exists::Yes = is_leecher_v2 {
post_announce_pipeline.cmd("ZREM").arg(&leechers_key).arg(&parsed.ip_port).ignore();
leech_count_mod -= 1
}

// Increment the downloaded count for the infohash stats
post_announce_pipeline.cmd("HINCRBY").arg(&parsed.info_hash).arg("downloaded").arg(1u32).ignore();
}
} else if is_leecher == false {
} else if let Exists::No = is_leecher_v2 {
post_announce_pipeline.cmd("ZADD").arg(&leechers_key).arg(time_now_ms).arg(&parsed.ip_port).ignore();
leech_count_mod += 1
leech_count_mod += 1;
};

// Update seeder & leecher count, if required
if seed_count_mod != 0 {
post_announce_pipeline.cmd("HINCRBY").arg(&parsed.info_hash).arg("seeders").arg(seed_count_mod).ignore();
}
// Cache miss = query redis
// no change = update cache
// change = clear cache

let final_res = match cached_reply.len() {
0 => {
// Cache miss. Lookup from redis
let (seeders, leechers) : (Vec<Vec<u8>>, Vec<Vec<u8>>) = redis::pipe()
.cmd("ZRANGEBYSCORE").arg(&seeders_key).arg(max_limit).arg(time_now_ms)
.cmd("ZRANGEBYSCORE").arg(&leechers_key).arg(max_limit).arg(time_now_ms)
.query_async(&mut rc).await.unwrap();

// endex = end index XD. seems in rust cannot select first 50 elements, or limit to less if vector doesnt have 50
// e.g. &seeders[0..50] is panicking when seeders len is < 50. Oh well.
let seeder_endex = std::cmp::min(seeders.len(), 50);
let leecher_endex = std::cmp::min(leechers.len(), 50);

query::announce_reply(seeders.len() as i64 + seed_count_mod, leechers.len() as i64 + leech_count_mod, &seeders[0..seeder_endex], &leechers[0..leecher_endex])
},
_ => {
post_announce_pipeline.cmd("INCR").arg(constants::CACHE_HIT_ANNOUNCE_COUNT_KEY).ignore();
cached_reply
}
};

if leech_count_mod != 0 {
post_announce_pipeline.cmd("HINCRBY").arg(&parsed.info_hash).arg("leechers").arg(leech_count_mod).ignore();
}
// Is there a change in seeders / leechers
if seed_count_mod != 0 || leech_count_mod != 0 {
// TBD: Maybe we can issue the HINCRBY anyway, it is:
// Pipelined
// In background (not .awaited for announce reply)
// O(1) in redis
// Can clean up this branching crap
if seed_count_mod != 0 {
post_announce_pipeline.cmd("HINCRBY").arg(&parsed.info_hash).arg("seeders").arg(seed_count_mod).ignore();
}

// endex = end index XD. seems in rust cannot select first 50 elements, or limit to less if vector doesnt have 50
// e.g. &seeders[0..50] is panicking when seeders len is < 50. Oh well.
let seeder_endex = if seeders.len() >= 50 {
50
} else {
seeders.len()
};
if leech_count_mod != 0 {
post_announce_pipeline.cmd("HINCRBY").arg(&parsed.info_hash).arg("leechers").arg(leech_count_mod).ignore();
}

let leecher_endex = if leechers.len() >= 50 {
50
// TODO: Patch cached reply with the count mods?
// Also invalidate existing cache
post_announce_pipeline.cmd("DEL").arg(&cache_key).ignore();
} else {
leechers.len()
};

let scount: i64 = seeders.len().try_into().expect("fucky wucky");
let lcount: i64 = leechers.len().try_into().expect("fucky wucky");
post_announce_pipeline.cmd("INCR").arg(constants::NOCHANGE_ANNOUNCE_COUNT_KEY).ignore();
// TBD: If we had a cache hit, any point to set it again?
// For now we are ok, since background pipeline, O(1) in redis.
post_announce_pipeline.cmd("SET").arg(&cache_key).arg(&final_res).arg("EX").arg(60 * 30).ignore();
}

let bruvva_res = query::announce_reply(scount + seed_count_mod, lcount + leech_count_mod, &seeders[0..seeder_endex], &leechers[0..leecher_endex]);

let time_end = SystemTime::now().duration_since(UNIX_EPOCH).expect("fucked up");
let time_end_ms: i64 = i64::try_from(time_end.as_millis()).expect("fucc");
Expand All @@ -167,20 +191,8 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {
post_announce_pipeline.cmd("INCR").arg(constants::ANNOUNCE_COUNT_KEY).ignore();
post_announce_pipeline.cmd("INCRBY").arg(constants::REQ_DURATION_KEY).arg(req_duration).ignore();

// If neither seeder count changed, neither leech count
// In future, we will use this to determine if we need to update the cache or not
if seed_count_mod == 0 && leech_count_mod == 0 {
post_announce_pipeline.cmd("INCR").arg(constants::NOCHANGE_ANNOUNCE_COUNT_KEY).ignore();
}

actix_web::rt::spawn(async move {
// 0.1% chance to trigger a clean,
let chance = rand::thread_rng().gen_range(0..1000);
if chance == 0 {
post_announce_pipeline.cmd("ZREMRANGEBYSCORE").arg(&seeders_key).arg(0).arg(max_limit).ignore();
post_announce_pipeline.cmd("ZREMRANGEBYSCORE").arg(&leechers_key).arg(0).arg(max_limit).ignore();
}

// log the summary
post_announce_pipeline.cmd("PUBLISH").arg("reqlog").arg(req_log::generate_csv(&user_ip_owned, &parsed.info_hash)).ignore();

Expand All @@ -194,7 +206,7 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {
};
});

return HttpResponse::build(StatusCode::OK).append_header(header::ContentType::plaintext()).body(bruvva_res);
return HttpResponse::build(StatusCode::OK).append_header(header::ContentType::plaintext()).body(final_res);
}

#[get("/healthz")]
Expand Down