Skip to content

Commit

Permalink
start koreader sync effort
Browse files Browse the repository at this point in the history
relates to #239
  • Loading branch information
aaronleopold committed Oct 17, 2024
1 parent 75f97cc commit ee26786
Show file tree
Hide file tree
Showing 16 changed files with 488 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ lettre = { version = "0.11.4", default-features = false, features = [
"tracing",
"tokio1-rustls-tls",
] }
md5 = "0.7.0"
once_cell = "1.19.0"
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.6.11", features = [
"sqlite-create-many",
Expand Down
10 changes: 0 additions & 10 deletions apps/server/src/config/state.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
use std::sync::Arc;

use axum::extract::State;
use axum_macros::FromRequestParts;
use stump_core::Ctx;

// TODO: I don't feel like I need this module... Unless I add things to it..
pub type AppState = Arc<Ctx>;

// TODO: is this how to fix the FIXME note in auth extractor?
#[derive(FromRequestParts, Clone)]
pub struct _AppState {
#[allow(unused)]
core_ctx: State<AppState>,
}
9 changes: 9 additions & 0 deletions apps/server/src/routers/koreader/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use axum::Router;

use crate::config::state::AppState;

mod sync;

pub(crate) fn mount(app_state: AppState) -> Router<AppState> {
Router::new().nest("/koreader", sync::mount(app_state))
}
346 changes: 346 additions & 0 deletions apps/server/src/routers/koreader/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,346 @@
use axum::{
extract::{Path, Request, State},
http::HeaderMap,
middleware::{self, Next},
response::{IntoResponse, Json, Response},
routing::{get, put},
Extension, Router,
};
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use stump_core::{
db::entity::macros::{
finished_session_koreader, reading_session_koreader, user_password_select,
},
prisma::{
active_reading_session, finished_reading_session, media,
registered_reading_device, user,
},
};

use crate::{
config::state::AppState,
errors::{APIError, APIResult},
utils::verify_password,
};

// TODO: healthcheck?

/// Mounts the koreader sync router at `/koreader` (from the parent router). The endpoints are
/// derived from the official koreader API.
///
/// See https://github.com/koreader/koreader-sync-server/blob/master/config/routes.lua
pub(crate) fn mount(app_state: AppState) -> Router<AppState> {
Router::new()
// TODO: create? I guess that can be another config?
.route("/users/auth", get(check_authorized))
.route("/syncs/progress", put(put_progress))
.route("/syncs/progress/:document", get(get_progress))
.layer(middleware::from_fn_with_state(app_state, authorize))
}

#[derive(Debug, Clone)]
struct KoReaderUser {
id: String,
username: String,
}

async fn authorize(
State(ctx): State<AppState>,
headers: HeaderMap,
mut req: Request,
next: Next,
) -> Result<Response, impl IntoResponse> {
let username = headers
.get("x-auth-user")
.ok_or(APIError::Unauthorized)?
.to_str()
.map_err(|_| APIError::BadRequest("Failed to parse username".to_string()))?;

let password = headers
.get("x-auth-key")
.ok_or(APIError::Unauthorized)?
.to_str()
.map_err(|_| APIError::BadRequest("Failed to parse password".to_string()))?;

tracing::debug!(username, "authorizing user");

let client = &ctx.db;

let user = client
.user()
.find_first(vec![
user::username::equals(username.to_string()),
user::deleted_at::equals(None),
user::is_locked::equals(false),
])
.select(user_password_select::select())
.exec()
.await?
.ok_or(APIError::Unauthorized)?;

verify_password(&user.hashed_password, password)?;

req.extensions_mut().insert(KoReaderUser {
id: user.id,
username: user.username,
});

Ok::<_, APIError>(next.run(req).await)
}

#[derive(Serialize)]
struct CheckAuthorizedResponse {
authorized: String,
}

async fn check_authorized() -> APIResult<Json<CheckAuthorizedResponse>> {
Ok(Json(CheckAuthorizedResponse {
authorized: "OK".to_string(),
}))
}

#[skip_serializing_none]
#[derive(Default, Serialize, Deserialize)]
struct GetProgressResponse {
/// A hash of the book, generated by koreader. This is used as an alternative method for
/// identifying a book, since koreader is unaware of the book's ID in stump
document: String,
// TODO(koreader): figure out what this is? A string??
progress: Option<String>,
// TODO(koreader): ensure the range is correct (0-1.0)
/// The reading progress of the book, as a percentage (0-1.0)
percentage: Option<f32>,
/// The name of the koreader device.
device: Option<String>,
/// The ID of the koreader device, generated by koreader. Stump will use this to identify a
/// registered reading device, if one exists with the ID.
device_id: Option<String>,
// TODO(koreader): ensure the format is correct (milliseconds since Unix epoch is assumption)
/// The timestamp of the last progress update, in milliseconds since the Unix epoch.
timestamp: Option<u64>,
}

async fn get_progress(
State(ctx): State<AppState>,
Extension(user): Extension<KoReaderUser>,
Path(document): Path<String>,
) -> APIResult<Json<GetProgressResponse>> {
let client = &ctx.db;
let document_cpy = document.clone();

let (active_session, finished_session) = client
._transaction()
.run(|tx| async move {
let active_session = tx
.active_reading_session()
.find_first(vec![
active_reading_session::user_id::equals(user.id.clone()),
active_reading_session::media::is(vec![
media::koreader_hash::equals(Some(document_cpy.clone())),
]),
])
.include(reading_session_koreader::include())
.exec()
.await?;

tx.finished_reading_session()
.find_first(vec![
finished_reading_session::user_id::equals(user.id.clone()),
finished_reading_session::media::is(vec![
media::koreader_hash::equals(Some(document_cpy)),
]),
])
.include(finished_session_koreader::include())
.exec()
.await
.map(|session| (active_session, session))
})
.await?;

// TODO(koreader): progress string (what does it mean!? lol)
let progress = match (active_session, finished_session) {
(Some(active_session), _) => GetProgressResponse {
document,
percentage: active_session.percentage_completed.map(|p| p as f32),
timestamp: Some(active_session.updated_at.timestamp_millis() as u64),
device: active_session.device.as_ref().map(|d| d.name.clone()),
device_id: active_session.device.as_ref().map(|d| d.id.clone()),
..Default::default()
},
(_, Some(finished_session)) => GetProgressResponse {
document,
percentage: Some(1.0),
timestamp: Some(finished_session.completed_at.timestamp_millis() as u64),
device: finished_session.device.as_ref().map(|d| d.name.clone()),
device_id: finished_session.device.as_ref().map(|d| d.id.clone()),
..Default::default()
},
_ => GetProgressResponse {
document,
..Default::default()
},
};

Ok(Json(progress))
}

#[derive(Deserialize)]
struct PutProgressInput {
document: String,
progress: String,
percentage: f32,
device: String,
device_id: String,
}

#[derive(Deserialize, Serialize)]
struct PutProgressResponse {
document: String,
timestamp: u64,
}

async fn put_progress(
State(ctx): State<AppState>,
Extension(user): Extension<KoReaderUser>,
Json(PutProgressInput {
document,
progress,
percentage,
device,
device_id,
}): Json<PutProgressInput>,
) -> APIResult<Json<PutProgressResponse>> {
let client = &ctx.db;

if !(0.0..=1.0).contains(&percentage) {
tracing::error!(
percentage,
"Invalid percentage provided for progress update"
);
return Err(APIError::BadRequest("Invalid percentage".to_string()));
}

let book = client
.media()
.find_first(vec![media::koreader_hash::equals(Some(document.clone()))])
.exec()
.await?
.ok_or_else(|| APIError::NotFound("Book not found".to_string()))?;

let is_completed = percentage == 1.0;
let document_cpy = document.clone();
let (active_session, finished_session) = client
._transaction()
.run(|tx| async move {
let _device_record = tx
.registered_reading_device()
.upsert(
registered_reading_device::id::equals(device_id.clone()),
(
device.clone(),
vec![registered_reading_device::id::set(device_id.clone())],
),
vec![registered_reading_device::name::set(device.clone())],
)
.exec()
.await?;

let existing_active_session = tx
.active_reading_session()
.find_first(vec![
active_reading_session::user_id::equals(user.id.clone()),
active_reading_session::media::is(vec![
media::koreader_hash::equals(Some(document_cpy.clone())),
]),
])
.exec()
.await?;

if is_completed {
if let Some(ref active_session) = existing_active_session {
tx.active_reading_session()
.delete(active_reading_session::id::equals(
active_session.id.clone(),
))
.exec()
.await?;
}

tx.finished_reading_session()
.create(
existing_active_session
.map(|s| s.started_at)
.unwrap_or_default(),
media::id::equals(book.id.clone()),
user::id::equals(user.id.clone()),
vec![finished_reading_session::device::connect(
registered_reading_device::id::equals(device_id.clone()),
)],
)
.exec()
.await
.map(|session| (None, Some(session)))
} else {
tx.active_reading_session()
.upsert(
active_reading_session::user_id_media_id(
user.id.clone(),
book.id.clone(),
),
(
media::id::equals(book.id.clone()),
user::id::equals(user.id.clone()),
vec![
active_reading_session::koreader_progress::set(Some(
progress.clone(),
)),
active_reading_session::percentage_completed::set(Some(
percentage as f64,
)),
// TODO(koreader): ensure this is correct
active_reading_session::device::connect(
registered_reading_device::id::equals(
device_id.clone(),
),
),
],
),
vec![
active_reading_session::koreader_progress::set(Some(
progress.clone(),
)),
active_reading_session::percentage_completed::set(Some(
percentage as f64,
)),
// TODO(koreader): ensure this is correct
active_reading_session::device::connect(
registered_reading_device::id::equals(device_id.clone()),
),
],
)
.exec()
.await
.map(|session| (Some(session), None))
}
})
.await?;

let timestamp = match (active_session, finished_session) {
(Some(active_session), _) => active_session.updated_at.timestamp_millis() as u64,
(_, Some(finished_session)) => {
finished_session.completed_at.timestamp_millis() as u64
},
_ => {
tracing::error!("Failed to update progress!");
return Err(APIError::InternalServerError(
"Failed to update progress".to_string(),
));
},
};

Ok(Json(PutProgressResponse {
document,
timestamp,
}))
}
Loading

0 comments on commit ee26786

Please sign in to comment.