diff --git a/Cargo.lock b/Cargo.lock index 7349fd8ac..77adf3c49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7601,6 +7601,7 @@ dependencies = [ "infer", "itertools 0.13.0", "libc", + "md5", "pdf", "pdfium-render", "prisma-client-rust", diff --git a/Cargo.toml b/Cargo.toml index 26b257a09..6f6927fe4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ lettre = { version = "0.11.4", default-features = false, features = [ "tracing", "tokio1-rustls-tls", ] } +md5 = "0.7.0" once_cell = "1.19.0" prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.6.11", features = [ "sqlite-create-many", diff --git a/apps/server/src/config/state.rs b/apps/server/src/config/state.rs index 91b3ca373..0ebec7116 100644 --- a/apps/server/src/config/state.rs +++ b/apps/server/src/config/state.rs @@ -1,15 +1,5 @@ use std::sync::Arc; -use axum::extract::State; -use axum_macros::FromRequestParts; use stump_core::Ctx; -// TODO: I don't feel like I need this module... Unless I add things to it.. pub type AppState = Arc; - -// TODO: is this how to fix the FIXME note in auth extractor? -#[derive(FromRequestParts, Clone)] -pub struct _AppState { - #[allow(unused)] - core_ctx: State, -} diff --git a/apps/server/src/routers/koreader/mod.rs b/apps/server/src/routers/koreader/mod.rs new file mode 100644 index 000000000..72955e61c --- /dev/null +++ b/apps/server/src/routers/koreader/mod.rs @@ -0,0 +1,9 @@ +use axum::Router; + +use crate::config::state::AppState; + +mod sync; + +pub(crate) fn mount(app_state: AppState) -> Router { + Router::new().nest("/koreader", sync::mount(app_state)) +} diff --git a/apps/server/src/routers/koreader/sync.rs b/apps/server/src/routers/koreader/sync.rs new file mode 100644 index 000000000..633b9e575 --- /dev/null +++ b/apps/server/src/routers/koreader/sync.rs @@ -0,0 +1,346 @@ +use axum::{ + extract::{Path, Request, State}, + http::HeaderMap, + middleware::{self, Next}, + response::{IntoResponse, Json, Response}, + routing::{get, put}, + Extension, Router, +}; +use serde::{Deserialize, Serialize}; +use serde_with::skip_serializing_none; +use stump_core::{ + db::entity::macros::{ + finished_session_koreader, reading_session_koreader, user_password_select, + }, + prisma::{ + active_reading_session, finished_reading_session, media, + registered_reading_device, user, + }, +}; + +use crate::{ + config::state::AppState, + errors::{APIError, APIResult}, + utils::verify_password, +}; + +// TODO: healthcheck? + +/// Mounts the koreader sync router at `/koreader` (from the parent router). The endpoints are +/// derived from the official koreader API. +/// +/// See https://github.com/koreader/koreader-sync-server/blob/master/config/routes.lua +pub(crate) fn mount(app_state: AppState) -> Router { + Router::new() + // TODO: create? I guess that can be another config? + .route("/users/auth", get(check_authorized)) + .route("/syncs/progress", put(put_progress)) + .route("/syncs/progress/:document", get(get_progress)) + .layer(middleware::from_fn_with_state(app_state, authorize)) +} + +#[derive(Debug, Clone)] +struct KoReaderUser { + id: String, + username: String, +} + +async fn authorize( + State(ctx): State, + headers: HeaderMap, + mut req: Request, + next: Next, +) -> Result { + let username = headers + .get("x-auth-user") + .ok_or(APIError::Unauthorized)? + .to_str() + .map_err(|_| APIError::BadRequest("Failed to parse username".to_string()))?; + + let password = headers + .get("x-auth-key") + .ok_or(APIError::Unauthorized)? + .to_str() + .map_err(|_| APIError::BadRequest("Failed to parse password".to_string()))?; + + tracing::debug!(username, "authorizing user"); + + let client = &ctx.db; + + let user = client + .user() + .find_first(vec![ + user::username::equals(username.to_string()), + user::deleted_at::equals(None), + user::is_locked::equals(false), + ]) + .select(user_password_select::select()) + .exec() + .await? + .ok_or(APIError::Unauthorized)?; + + verify_password(&user.hashed_password, password)?; + + req.extensions_mut().insert(KoReaderUser { + id: user.id, + username: user.username, + }); + + Ok::<_, APIError>(next.run(req).await) +} + +#[derive(Serialize)] +struct CheckAuthorizedResponse { + authorized: String, +} + +async fn check_authorized() -> APIResult> { + Ok(Json(CheckAuthorizedResponse { + authorized: "OK".to_string(), + })) +} + +#[skip_serializing_none] +#[derive(Default, Serialize, Deserialize)] +struct GetProgressResponse { + /// A hash of the book, generated by koreader. This is used as an alternative method for + /// identifying a book, since koreader is unaware of the book's ID in stump + document: String, + // TODO(koreader): figure out what this is? A string?? + progress: Option, + // TODO(koreader): ensure the range is correct (0-1.0) + /// The reading progress of the book, as a percentage (0-1.0) + percentage: Option, + /// The name of the koreader device. + device: Option, + /// The ID of the koreader device, generated by koreader. Stump will use this to identify a + /// registered reading device, if one exists with the ID. + device_id: Option, + // TODO(koreader): ensure the format is correct (milliseconds since Unix epoch is assumption) + /// The timestamp of the last progress update, in milliseconds since the Unix epoch. + timestamp: Option, +} + +async fn get_progress( + State(ctx): State, + Extension(user): Extension, + Path(document): Path, +) -> APIResult> { + let client = &ctx.db; + let document_cpy = document.clone(); + + let (active_session, finished_session) = client + ._transaction() + .run(|tx| async move { + let active_session = tx + .active_reading_session() + .find_first(vec![ + active_reading_session::user_id::equals(user.id.clone()), + active_reading_session::media::is(vec![ + media::koreader_hash::equals(Some(document_cpy.clone())), + ]), + ]) + .include(reading_session_koreader::include()) + .exec() + .await?; + + tx.finished_reading_session() + .find_first(vec![ + finished_reading_session::user_id::equals(user.id.clone()), + finished_reading_session::media::is(vec![ + media::koreader_hash::equals(Some(document_cpy)), + ]), + ]) + .include(finished_session_koreader::include()) + .exec() + .await + .map(|session| (active_session, session)) + }) + .await?; + + // TODO(koreader): progress string (what does it mean!? lol) + let progress = match (active_session, finished_session) { + (Some(active_session), _) => GetProgressResponse { + document, + percentage: active_session.percentage_completed.map(|p| p as f32), + timestamp: Some(active_session.updated_at.timestamp_millis() as u64), + device: active_session.device.as_ref().map(|d| d.name.clone()), + device_id: active_session.device.as_ref().map(|d| d.id.clone()), + ..Default::default() + }, + (_, Some(finished_session)) => GetProgressResponse { + document, + percentage: Some(1.0), + timestamp: Some(finished_session.completed_at.timestamp_millis() as u64), + device: finished_session.device.as_ref().map(|d| d.name.clone()), + device_id: finished_session.device.as_ref().map(|d| d.id.clone()), + ..Default::default() + }, + _ => GetProgressResponse { + document, + ..Default::default() + }, + }; + + Ok(Json(progress)) +} + +#[derive(Deserialize)] +struct PutProgressInput { + document: String, + progress: String, + percentage: f32, + device: String, + device_id: String, +} + +#[derive(Deserialize, Serialize)] +struct PutProgressResponse { + document: String, + timestamp: u64, +} + +async fn put_progress( + State(ctx): State, + Extension(user): Extension, + Json(PutProgressInput { + document, + progress, + percentage, + device, + device_id, + }): Json, +) -> APIResult> { + let client = &ctx.db; + + if !(0.0..=1.0).contains(&percentage) { + tracing::error!( + percentage, + "Invalid percentage provided for progress update" + ); + return Err(APIError::BadRequest("Invalid percentage".to_string())); + } + + let book = client + .media() + .find_first(vec![media::koreader_hash::equals(Some(document.clone()))]) + .exec() + .await? + .ok_or_else(|| APIError::NotFound("Book not found".to_string()))?; + + let is_completed = percentage == 1.0; + let document_cpy = document.clone(); + let (active_session, finished_session) = client + ._transaction() + .run(|tx| async move { + let _device_record = tx + .registered_reading_device() + .upsert( + registered_reading_device::id::equals(device_id.clone()), + ( + device.clone(), + vec![registered_reading_device::id::set(device_id.clone())], + ), + vec![registered_reading_device::name::set(device.clone())], + ) + .exec() + .await?; + + let existing_active_session = tx + .active_reading_session() + .find_first(vec![ + active_reading_session::user_id::equals(user.id.clone()), + active_reading_session::media::is(vec![ + media::koreader_hash::equals(Some(document_cpy.clone())), + ]), + ]) + .exec() + .await?; + + if is_completed { + if let Some(ref active_session) = existing_active_session { + tx.active_reading_session() + .delete(active_reading_session::id::equals( + active_session.id.clone(), + )) + .exec() + .await?; + } + + tx.finished_reading_session() + .create( + existing_active_session + .map(|s| s.started_at) + .unwrap_or_default(), + media::id::equals(book.id.clone()), + user::id::equals(user.id.clone()), + vec![finished_reading_session::device::connect( + registered_reading_device::id::equals(device_id.clone()), + )], + ) + .exec() + .await + .map(|session| (None, Some(session))) + } else { + tx.active_reading_session() + .upsert( + active_reading_session::user_id_media_id( + user.id.clone(), + book.id.clone(), + ), + ( + media::id::equals(book.id.clone()), + user::id::equals(user.id.clone()), + vec![ + active_reading_session::koreader_progress::set(Some( + progress.clone(), + )), + active_reading_session::percentage_completed::set(Some( + percentage as f64, + )), + // TODO(koreader): ensure this is correct + active_reading_session::device::connect( + registered_reading_device::id::equals( + device_id.clone(), + ), + ), + ], + ), + vec![ + active_reading_session::koreader_progress::set(Some( + progress.clone(), + )), + active_reading_session::percentage_completed::set(Some( + percentage as f64, + )), + // TODO(koreader): ensure this is correct + active_reading_session::device::connect( + registered_reading_device::id::equals(device_id.clone()), + ), + ], + ) + .exec() + .await + .map(|session| (Some(session), None)) + } + }) + .await?; + + let timestamp = match (active_session, finished_session) { + (Some(active_session), _) => active_session.updated_at.timestamp_millis() as u64, + (_, Some(finished_session)) => { + finished_session.completed_at.timestamp_millis() as u64 + }, + _ => { + tracing::error!("Failed to update progress!"); + return Err(APIError::InternalServerError( + "Failed to update progress".to_string(), + )); + }, + }; + + Ok(Json(PutProgressResponse { + document, + timestamp, + })) +} diff --git a/apps/server/src/routers/mod.rs b/apps/server/src/routers/mod.rs index 4c7beaf36..3f515c16d 100644 --- a/apps/server/src/routers/mod.rs +++ b/apps/server/src/routers/mod.rs @@ -3,6 +3,7 @@ use axum::Router; use crate::config::state::AppState; mod api; +mod koreader; mod opds; mod spa; mod sse; @@ -20,9 +21,12 @@ pub(crate) fn mount(app_state: AppState) -> Router { app_router = app_router.merge(utoipa::mount(app_state.clone())); } + if app_state.config.enable_koreader_sync { + app_router = app_router.merge(koreader::mount(app_state.clone())); + } + app_router .merge(spa::mount(app_state.clone())) - // .merge(ws::mount(app_state.clone())) .merge(sse::mount()) .merge(api::mount(app_state.clone())) .merge(opds::mount(app_state)) diff --git a/core/Cargo.toml b/core/Cargo.toml index fbeee6e4c..536b33121 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -18,6 +18,7 @@ globset = "0.4.14" image = { version = "0.25.2" } infer = "0.16.0" itertools = { workspace = true } +md5 = { workspace = true } prisma-client-rust = { workspace = true } rand = { workspace = true } serde = { workspace = true } diff --git a/core/integration-tests/data/leaves.epub b/core/integration-tests/data/leaves.epub new file mode 100644 index 000000000..0060d0882 Binary files /dev/null and b/core/integration-tests/data/leaves.epub differ diff --git a/core/integration-tests/data/tall.pdf b/core/integration-tests/data/tall.pdf new file mode 100644 index 000000000..c58ebc3d8 Binary files /dev/null and b/core/integration-tests/data/tall.pdf differ diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index 3e7925df4..0140e850e 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -119,6 +119,7 @@ model LibraryConfig { default_reading_mode String @default("paged") // paged or continuous:(horizontal|vertical) default_reading_image_scale_fit String @default("height") // height, width, none (original) generate_file_hashes Boolean @default(false) + generate_koreader_hashes Boolean @default(false) // TODO(koreader): determine if we need this much granularity process_metadata Boolean @default(true) library_pattern String @default("SERIES_BASED") // SERIES_BASED or COLLECTION_BASED @@ -188,17 +189,18 @@ model SeriesMetadata { model Media { id String @id @default(uuid()) - name String // derived from filename - size BigInt // in bytes - extension String - pages Int - updated_at DateTime @updatedAt - created_at DateTime @default(now()) - modified_at DateTime? // last modified date of the file - deleted_at DateTime? - hash String? // This is **not** an integrity check(sum), and is not used to verify the file contents. - path String - status String @default("READY") // UNKNOWN, READY, UNSUPPORTED, ERROR, MISSING + name String // derived from filename + size BigInt // in bytes + extension String + pages Int + updated_at DateTime @updatedAt + created_at DateTime @default(now()) + modified_at DateTime? // last modified date of the file + deleted_at DateTime? + hash String? // This is **not** an integrity check(sum), and is not used to verify the file contents. + koreader_hash String? // This is the hash used by KOReader to identify the file + path String + status String @default("READY") // UNKNOWN, READY, UNSUPPORTED, ERROR, MISSING metadata MediaMetadata? series Series? @relation(fields: [series_id], references: [id], onDelete: Cascade) @@ -296,15 +298,25 @@ model FinishedReadingSession { // TODO: Support reading duration in the future - media_id String - media Media @relation(fields: [media_id], references: [id], onDelete: Cascade) - - user_id String - user User @relation(fields: [user_id], references: [id], onDelete: Cascade) + media_id String + media Media @relation(fields: [media_id], references: [id], onDelete: Cascade) + user_id String + user User @relation(fields: [user_id], references: [id], onDelete: Cascade) + device_id String? + device RegisteredReadingDevice? @relation(fields: [device_id], references: [id], onDelete: Cascade) @@map("finished_reading_sessions") } +model RegisteredReadingDevice { + id String @id @default(cuid()) + name String @unique + kind String? // ex: "KOBO" + + active_reading_sessions ActiveReadingSession[] + finished_reading_sessions FinishedReadingSession[] +} + model ActiveReadingSession { id String @id @default(uuid()) @@ -313,15 +325,17 @@ model ActiveReadingSession { page Int? percentage_completed Float? // 0.0 - 1.0 epubcfi String? + koreader_progress String? started_at DateTime @default(now()) updated_at DateTime @updatedAt - media_id String - media Media @relation(fields: [media_id], references: [id], onDelete: Cascade) - - user_id String - user User @relation(fields: [user_id], references: [id], onDelete: Cascade) + media_id String + media Media @relation(fields: [media_id], references: [id], onDelete: Cascade) + user_id String + user User @relation(fields: [user_id], references: [id], onDelete: Cascade) + device_id String? + device RegisteredReadingDevice? @relation(fields: [device_id], references: [id], onDelete: Cascade) @@unique([user_id, media_id]) @@map("reading_sessions") diff --git a/core/src/config/stump_config.rs b/core/src/config/stump_config.rs index f2f44df89..50f094c5e 100644 --- a/core/src/config/stump_config.rs +++ b/core/src/config/stump_config.rs @@ -26,6 +26,7 @@ pub mod env_keys { pub const ORIGINS_KEY: &str = "STUMP_ALLOWED_ORIGINS"; pub const PDFIUM_KEY: &str = "PDFIUM_PATH"; pub const DISABLE_SWAGGER_KEY: &str = "DISABLE_SWAGGER_UI"; + pub const ENABLE_KOREADER_SYNC_KEY: &str = "ENABLE_KOREADER_SYNC"; pub const HASH_COST_KEY: &str = "HASH_COST"; pub const SESSION_TTL_KEY: &str = "SESSION_TTL"; pub const SESSION_EXPIRY_INTERVAL_KEY: &str = "SESSION_EXPIRY_CLEANUP_INTERVAL"; @@ -130,11 +131,17 @@ pub struct StumpConfig { #[env_key(PDFIUM_KEY)] pub pdfium_path: Option, + // TODO: rename to enable_swagger_ui for consistency and clarity (no double negatives) /// Indicates if the Swagger UI should be disabled. #[default_value(false)] #[env_key(DISABLE_SWAGGER_KEY)] pub disable_swagger: bool, + /// Indicates if the KoReader sync feature should be enabled. + #[default_value(false)] + #[env_key(ENABLE_KOREADER_SYNC_KEY)] + pub enable_koreader_sync: bool, + /// Password hash cost #[default_value(DEFAULT_PASSWORD_HASH_COST)] #[env_key(HASH_COST_KEY)] @@ -308,6 +315,7 @@ mod tests { allowed_origins: Some(vec!["origin1".to_string(), "origin2".to_string()]), pdfium_path: Some("not_a_path_to_pdfium".to_string()), disable_swagger: Some(false), + enable_koreader_sync: Some(false), password_hash_cost: None, session_ttl: None, access_token_ttl: None, @@ -342,6 +350,7 @@ mod tests { allowed_origins: Some(vec!["origin1".to_string(), "origin2".to_string()]), pdfium_path: Some("not_a_path_to_pdfium".to_string()), disable_swagger: Some(false), + enable_koreader_sync: Some(false), password_hash_cost: Some(DEFAULT_PASSWORD_HASH_COST), session_ttl: Some(DEFAULT_SESSION_TTL), access_token_ttl: Some(DEFAULT_ACCESS_TOKEN_TTL), @@ -393,6 +402,7 @@ mod tests { allowed_origins: vec![], pdfium_path: None, disable_swagger: true, + enable_koreader_sync: false, password_hash_cost: 1, session_ttl: DEFAULT_SESSION_TTL, access_token_ttl: DEFAULT_ACCESS_TOKEN_TTL, diff --git a/core/src/db/entity/media/prisma_macros.rs b/core/src/db/entity/media/prisma_macros.rs index e8d8b4773..c836f69f9 100644 --- a/core/src/db/entity/media/prisma_macros.rs +++ b/core/src/db/entity/media/prisma_macros.rs @@ -29,3 +29,7 @@ finished_reading_session::select!(finished_reading_session_series_complete { media_id completed_at }); + +active_reading_session::include!(reading_session_koreader { device }); + +finished_reading_session::include!(finished_session_koreader { device }); diff --git a/core/src/db/entity/user/prisma_macros.rs b/core/src/db/entity/user/prisma_macros.rs index e1a5e58b4..a680e9c89 100644 --- a/core/src/db/entity/user/prisma_macros.rs +++ b/core/src/db/entity/user/prisma_macros.rs @@ -7,3 +7,9 @@ user::select!(user_basic_profile_select { avatar_url created_at }); + +user::select!(user_password_select { + id + username + hashed_password +}); diff --git a/core/src/db/filter/smart_filter.rs b/core/src/db/filter/smart_filter.rs index 6a79d9dd9..9860b584d 100644 --- a/core/src/db/filter/smart_filter.rs +++ b/core/src/db/filter/smart_filter.rs @@ -543,6 +543,7 @@ mod tests { deleted_at: None, extension: "CBZ".to_string(), hash: None, + koreader_hash: None, metadata: None, modified_at: None, pages: 30, diff --git a/core/src/filesystem/hash.rs b/core/src/filesystem/hash.rs index 647427b6a..3bfef9d6d 100644 --- a/core/src/filesystem/hash.rs +++ b/core/src/filesystem/hash.rs @@ -1,6 +1,6 @@ use data_encoding::HEXLOWER; use ring::digest::{Context, SHA256}; -use std::io; +use std::io::{self, Read, Seek}; use tracing::debug; // use std::fs::File; @@ -59,23 +59,76 @@ pub fn generate(path: &str, bytes: u64) -> Result { Ok(encoded_digest) } -// pub fn generate_from_reader(mut reader: R) -> Result { -// let mut ring_context = Context::new(&SHA256); +/// Generate a hash for a file using a port of the Koreader hash algorithm, which is +/// originally written in Lua. The algorithm reads the file in 1KB chunks, starting +/// from the beginning, until it reaches the end of the file or 10 iterations. It isn't +/// overly complex. +/// +/// See https://github.com/koreader/koreader/blob/master/frontend/util.lua#L1046-L1072 +#[tracing::instrument(fields(path = %path.as_ref().display()))] +pub fn generate_koreader_hash>( + path: P, +) -> Result { + let mut file = std::fs::File::open(path)?; -// let mut buffer = [0; 1024]; + let mut md5_context = md5::Context::new(); -// loop { -// let count = reader.read(&mut buffer)?; + let step = 1024i64; + let size = 1024i64; -// // This reader has reached its "end of file" -// if count == 0 { -// break; -// } + for i in -1..=10 { + let offset = if i == -1 { 0 } else { step << (2 * i) }; + file.seek(std::io::SeekFrom::Start(offset as u64))?; -// ring_context.update(&buffer[..count]); -// } + let mut buffer = vec![0u8; size as usize]; + let bytes_read = file.read(&mut buffer)?; -// let digest = ring_context.finish(); + // println!("Offset: {}, Bytes Read: {}, i: {i}", offset, bytes_read,); -// Ok(HEXLOWER.encode(digest.as_ref())) -// } + if bytes_read == 0 { + tracing::trace!(?offset, "Reached end of file"); + break; + } + + md5_context.consume(&buffer); + } + + let hash = format!("{:x}", md5_context.compute()); + tracing::debug!(hash = %hash, "Generated hash"); + + Ok(hash) +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use super::*; + + fn epub_path() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("integration-tests/data/leaves.epub") + } + + fn pdf_path() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("integration-tests/data/tall.pdf") + } + + // https://github.com/koreader/koreader/blob/master/spec/unit/util_spec.lua#L339-L341 + #[test] + fn test_koreader_hash_epub() { + assert_eq!( + generate_koreader_hash(epub_path()).unwrap(), + "59d481d168cca6267322f150c5f6a2a3".to_string() + ) + } + + // https://github.com/koreader/koreader/blob/master/spec/unit/util_spec.lua#L342-L344 + #[test] + fn test_koreader_hash_pdf() { + assert_eq!( + generate_koreader_hash(pdf_path()).unwrap(), + "41cce710f34e5ec21315e19c99821415".to_string() + ) + } +} diff --git a/core/src/opds/v2_0/publication.rs b/core/src/opds/v2_0/publication.rs index 6692673d8..07cc8402e 100644 --- a/core/src/opds/v2_0/publication.rs +++ b/core/src/opds/v2_0/publication.rs @@ -318,6 +318,7 @@ mod tests { path: get_test_epub_path(), status: FileStatus::Ready.to_string(), hash: Some(String::from("hash")), + koreader_hash: None, series_id: Some("1".to_string()), pages: 0, modified_at: None,