From b0de2061787c37e20f46ef2c86034e54a25dab78 Mon Sep 17 00:00:00 2001 From: Augustus Mayo Date: Mon, 30 Dec 2024 12:20:16 -0500 Subject: [PATCH] Storage refactor (#242) * Work on refactoring storage an retrieval * Populate meta store * Add meta to mock store * Fix license * More vec filters * Use commit sha type throughout --- Cargo.lock | 2 + rfd-api/Cargo.toml | 2 + rfd-api/src/context.rs | 615 ++++++------------------- rfd-api/src/endpoints/meta.rs | 192 +++++++- rfd-api/src/endpoints/rfd.rs | 251 ++++++++-- rfd-api/src/main.rs | 7 +- rfd-api/src/server.rs | 15 +- rfd-cli/src/main.rs | 2 +- rfd-github/src/lib.rs | 22 + rfd-model/src/db.rs | 6 +- rfd-model/src/lib.rs | 38 +- rfd-model/src/storage/mock.rs | 236 ++++++++++ rfd-model/src/storage/mod.rs | 102 ++++- rfd-model/src/storage/postgres.rs | 733 ++++++++++++++++-------------- rfd-processor/src/context.rs | 3 +- rfd-processor/src/processor.rs | 4 +- rfd-processor/src/rfd.rs | 16 +- 17 files changed, 1312 insertions(+), 934 deletions(-) create mode 100644 rfd-model/src/storage/mock.rs diff --git a/Cargo.lock b/Cargo.lock index b308fded..5587e6e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2853,8 +2853,10 @@ name = "rfd-api" version = "0.8.0" dependencies = [ "anyhow", + "async-bb8-diesel", "async-trait", "base64 0.22.1", + "bb8", "chrono", "config", "cookie", diff --git a/rfd-api/Cargo.toml b/rfd-api/Cargo.toml index 7fb8c274..074539ac 100644 --- a/rfd-api/Cargo.toml +++ b/rfd-api/Cargo.toml @@ -9,8 +9,10 @@ local-dev = ["v-api/local-dev"] [dependencies] anyhow = { workspace = true } +async-bb8-diesel = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } +bb8 = { workspace = true } chrono = { workspace = true, features = ["serde"] } config = { workspace = true } cookie = { workspace = true } diff --git a/rfd-api/src/context.rs b/rfd-api/src/context.rs index 91e3e187..b756d7fe 100644 --- a/rfd-api/src/context.rs +++ b/rfd-api/src/context.rs @@ -17,10 +17,7 @@ use rfd_data::{ use rfd_github::{GitHubError, GitHubNewRfdNumber, GitHubRfdRepo}; use rfd_model::{ schema_ext::{ContentFormat, Visibility}, - storage::{ - JobStore, RfdFilter, RfdPdfFilter, RfdPdfStore, RfdRevisionFilter, RfdRevisionMetaStore, - RfdRevisionStore, RfdStore, - }, + storage::{JobStore, RfdFilter, RfdMetaStore, RfdPdfFilter, RfdPdfStore, RfdStorage, RfdStore}, CommitSha, FileSha, Job, NewJob, Rfd, RfdId, RfdRevision, }; use rsa::{ @@ -29,12 +26,14 @@ use rsa::{ }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use std::{cmp::Ordering, collections::BTreeSet, sync::Arc}; +use std::{cmp::Ordering, sync::Arc}; use tap::TapFallible; use thiserror::Error; use tracing::instrument; use v_api::{ - response::{resource_not_found, resource_restricted, ResourceResult, ToResourceResult}, + response::{ + resource_not_found, resource_restricted, ResourceError, ResourceResult, ToResourceResult, + }, ApiContext, VContext, }; use v_model::{ @@ -51,25 +50,9 @@ use crate::{ static UNLIMITED: i64 = 9999999; -pub trait Storage: - RfdStore + RfdRevisionStore + RfdPdfStore + RfdRevisionMetaStore + JobStore + Send + Sync + 'static -{ -} -impl Storage for T where - T: RfdStore - + RfdRevisionStore - + RfdPdfStore - + RfdRevisionMetaStore - + JobStore - + Send - + Sync - + 'static -{ -} - pub struct RfdContext { pub public_url: String, - pub storage: Arc, + pub storage: Arc, pub search: SearchContext, pub content: ContentContext, pub github: GitHubRfdRepo, @@ -107,9 +90,9 @@ pub enum UpdateRfdContentError { Storage(#[from] StoreError), } -#[partial(RfdMeta)] +#[partial(RfdWithoutContent)] #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] -pub struct FullRfd { +pub struct RfdWithContent { pub id: TypedUuid, pub rfd_number: i32, pub link: Option, @@ -118,19 +101,19 @@ pub struct FullRfd { pub state: Option, pub authors: Option, pub labels: Option, - #[partial(RfdMeta(skip))] + #[partial(RfdWithoutContent(skip))] pub content: String, pub format: ContentFormat, pub sha: FileSha, pub commit: CommitSha, pub committed_at: DateTime, - #[partial(RfdMeta(skip))] - pub pdfs: Vec, + #[partial(RfdWithoutContent(skip))] + pub pdfs: Vec, pub visibility: Visibility, } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] -pub struct FullRfdPdfEntry { +pub struct PdfEntry { pub source: String, pub link: String, } @@ -138,7 +121,7 @@ pub struct FullRfdPdfEntry { impl RfdContext { pub async fn new( public_url: String, - storage: Arc, + storage: Arc, search: SearchConfig, content: ContentConfig, services: ServicesConfig, @@ -216,81 +199,6 @@ impl RfdContext { // RFD Operations - pub async fn list_rfds( - &self, - caller: &Caller, - filter: Option, - ) -> ResourceResult, StoreError> { - // List all of the RFDs first and then perform filter. This should be be improved once - // filters can be combined to support OR expressions. Effectively we need to be able to - // express "Has access to OR is public" with a filter - let mut rfds = RfdStore::list( - &*self.storage, - filter.unwrap_or_default(), - &ListPagination::default().limit(UNLIMITED), - ) - .await - .tap_err(|err| tracing::error!(?err, "Failed to lookup RFDs")) - .to_resource_result()?; - - // Determine the list of RFDs the caller has direct access to - let direct_access_rfds = caller - .permissions - .iter() - .filter_map(|p| match p { - RfdPermission::GetRfd(number) => Some(*number), - _ => None, - }) - .collect::>(); - - // Filter the list of RFDs down to only those that the caller is allowed to access - rfds.retain_mut(|rfd| { - caller.can(&RfdPermission::GetRfdsAll) - || rfd.visibility == Visibility::Public - || direct_access_rfds.contains(&rfd.rfd_number) - }); - - // Fetch the latest revision for each of the RFDs that is to be returned - let mut rfd_revisions = RfdRevisionMetaStore::list_unique_rfd( - &*self.storage, - RfdRevisionFilter::default().rfd(Some(rfds.iter().map(|rfd| rfd.id).collect())), - &ListPagination::default().limit(UNLIMITED), - ) - .await - .tap_err(|err| tracing::error!(?err, "Failed to lookup RFD revisions")) - .to_resource_result()?; - - // Sort both the RFDs and revisions based on their RFD id to ensure they line up - rfds.sort_by(|a, b| a.id.cmp(&b.id)); - rfd_revisions.sort_by(|a, b| a.rfd_id.cmp(&b.rfd_id)); - - // Zip together the RFDs with their associated revision - let mut rfd_list = rfds - .into_iter() - .zip(rfd_revisions) - .map(|(rfd, revision)| RfdMeta { - id: rfd.id, - rfd_number: rfd.rfd_number, - link: rfd.link, - discussion: revision.discussion, - title: revision.title, - state: revision.state, - authors: revision.authors, - labels: revision.labels, - format: revision.content_format, - sha: revision.sha, - commit: revision.commit.into(), - committed_at: revision.committed_at, - visibility: rfd.visibility, - }) - .collect::>(); - - // Finally sort the RFD list by RFD number - rfd_list.sort_by(|a, b| b.rfd_number.cmp(&a.rfd_number)); - - Ok(rfd_list) - } - #[instrument(skip(self, caller), err(Debug))] pub async fn create_rfd( &self, @@ -368,143 +276,159 @@ impl RfdContext { } } - #[instrument(skip(self, caller))] - pub async fn get_rfd( + pub async fn list_rfds( &self, caller: &Caller, - rfd_number: i32, - sha: Option, - ) -> ResourceResult { - // list_rfds performs authorization checks, if the caller does not have access to the - // requested RFD an empty Vec will be returned - let rfds = self - .list_rfds( - caller, - Some(RfdFilter::default().rfd_number(Some(vec![rfd_number]))), - ) - .await?; + filter: Option, + ) -> ResourceResult, StoreError> { + // List all of the RFDs first and then perform filter. This should be be improved once + // filters can be combined to support OR expressions. Effectively we need to be able to + // express "Has access to OR is public" with a filter + let mut rfds = RfdMetaStore::list( + &*self.storage, + filter.map(|filter| vec![filter]).unwrap_or_default(), + &ListPagination::default().limit(UNLIMITED), + ) + .await + .tap_err(|err| tracing::error!(?err, "Failed to lookup RFDs")) + .to_resource_result()?; - if let Some(rfd) = rfds.into_iter().nth(0) { - // If list_rfds returned a RFD, then the caller is allowed to access that RFD and we - // can return the full RFD revision. This is sub-optimal as we are required to execute - // the revision lookup twice - let latest_revision = RfdRevisionStore::list( - &*self.storage, - RfdRevisionFilter::default() - .rfd(Some(vec![rfd.id])) - .sha(sha.map(|sha| vec![sha])), - &ListPagination::default().limit(1), - ) - .await - .to_resource_result()?; + // Filter the list of RFDs down to only those that the caller is allowed to access + rfds.retain_mut(|rfd| { + caller.can(&RfdPermission::GetRfdsAll) + || caller.can(&RfdPermission::GetRfd(rfd.rfd_number)) + || rfd.visibility == Visibility::Public + }); - if let Some(revision) = latest_revision.into_iter().nth(0) { - let pdfs = RfdPdfStore::list( - &*self.storage, - RfdPdfFilter::default().rfd_revision(Some(vec![revision.id])), - &ListPagination::default(), - ) - .await - .to_resource_result()?; + let mut rfd_list = rfds + .into_iter() + .map(|rfd| RfdWithoutContent { + id: rfd.id, + rfd_number: rfd.rfd_number, + link: rfd.link, + discussion: rfd.content.discussion, + title: rfd.content.title, + state: rfd.content.state, + authors: rfd.content.authors, + labels: rfd.content.labels, + format: rfd.content.content_format, + sha: rfd.content.sha, + commit: rfd.content.commit.into(), + committed_at: rfd.content.committed_at, + visibility: rfd.visibility, + }) + .collect::>(); - Ok(FullRfd { - id: rfd.id, - rfd_number: rfd.rfd_number, - link: rfd.link, - discussion: revision.discussion, - title: revision.title, - state: revision.state, - authors: revision.authors, - labels: revision.labels, - content: revision.content, - format: revision.content_format, - sha: revision.sha, - commit: revision.commit.into(), - committed_at: revision.committed_at, - pdfs: pdfs - .into_iter() - .map(|pdf| FullRfdPdfEntry { - source: pdf.source.to_string(), - link: pdf.link, - }) - .collect(), - visibility: rfd.visibility, - }) + // Finally sort the RFD list by RFD number + rfd_list.sort_by(|a, b| b.rfd_number.cmp(&a.rfd_number)); + + Ok(rfd_list) + } + + #[instrument(skip(self, caller))] + async fn get_rfd_by_number( + &self, + caller: &Caller, + rfd_number: i32, + commit: Option, + ) -> ResourceResult { + let rfd = RfdStore::list( + &*self.storage, + vec![RfdFilter::default() + .rfd_number(Some(vec![rfd_number])) + .commit(commit.map(|commit| vec![commit]))], + &ListPagination::latest(), + ) + .await + .to_resource_result()? + .pop(); + + if let Some(rfd) = rfd { + if caller.can(&RfdPermission::GetRfdsAll) + || caller.can(&RfdPermission::GetRfd(rfd.rfd_number)) + || rfd.visibility == Visibility::Public + { + Ok(rfd) } else { - // It should not be possible to reach this branch. If we have then the database - // has entered an inconsistent state - tracing::error!("Looking up revision for RFD returned no results"); resource_not_found() } } else { - // Either the RFD does not exist, or the caller is not allowed to access it resource_not_found() } } #[instrument(skip(self, caller))] - pub async fn get_rfd_meta( + pub async fn view_rfd( &self, caller: &Caller, rfd_number: i32, - sha: Option, - ) -> ResourceResult { - // list_rfds performs authorization checks, if the caller does not have access to the - // requested RFD an empty Vec will be returned - let rfds = self + commit: Option, + ) -> ResourceResult { + let rfd = self.get_rfd_by_number(caller, rfd_number, commit).await?; + let pdfs = RfdPdfStore::list( + &*self.storage, + vec![RfdPdfFilter::default().rfd_revision(Some(vec![rfd.content.id]))], + &ListPagination::default(), + ) + .await + .to_resource_result()?; + + Ok(RfdWithContent { + id: rfd.id, + rfd_number: rfd.rfd_number, + link: rfd.link, + discussion: rfd.content.discussion, + title: rfd.content.title, + state: rfd.content.state, + authors: rfd.content.authors, + labels: rfd.content.labels, + content: rfd.content.content, + format: rfd.content.content_format, + sha: rfd.content.sha, + commit: rfd.content.commit.into(), + committed_at: rfd.content.committed_at, + pdfs: pdfs + .into_iter() + .map(|pdf| PdfEntry { + source: pdf.source.to_string(), + link: pdf.link, + }) + .collect(), + visibility: rfd.visibility, + }) + } + + #[instrument(skip(self, caller))] + pub async fn view_rfd_meta( + &self, + caller: &Caller, + rfd_number: i32, + commit: Option, + ) -> ResourceResult { + let rfd = self .list_rfds( caller, - Some(RfdFilter::default().rfd_number(Some(vec![rfd_number]))), + Some( + RfdFilter::default() + .rfd_number(Some(vec![rfd_number])) + .commit(commit.map(|commit| vec![commit])), + ), ) - .await?; + .await? + .pop(); - if let Some(rfd) = rfds.into_iter().nth(0) { - Ok(rfd) - } else { - // Either the RFD does not exist, or the caller is not allowed to access it - resource_not_found() - } + rfd.ok_or(ResourceError::DoesNotExist) } #[instrument(skip(self, caller))] - pub async fn get_rfd_revision( + pub async fn view_rfd_revision( &self, caller: &Caller, rfd_number: i32, - sha: Option, + commit: Option, ) -> ResourceResult { - if caller.any(&[ - &RfdPermission::GetRfd(rfd_number), - &RfdPermission::GetRfdsAll, - ]) { - let rfds = RfdStore::list( - &*self.storage, - RfdFilter::default().rfd_number(Some(vec![rfd_number])), - &ListPagination::default().limit(1), - ) - .await - .to_resource_result()?; - if let Some(rfd) = rfds.into_iter().nth(0) { - let latest_revision = RfdRevisionStore::list( - &*self.storage, - RfdRevisionFilter::default() - .rfd(Some(vec![rfd.id])) - .sha(sha.map(|sha| vec![sha])), - &ListPagination::default().limit(1), - ) - .await - .to_resource_result()?; - - match latest_revision.into_iter().nth(0) { - Some(revision) => Ok(revision), - None => resource_not_found(), - } - } else { - resource_not_found() - } - } else { - resource_restricted() - } + let rfd = self.get_rfd_by_number(caller, rfd_number, commit).await?; + Ok(rfd.content) } async fn get_latest_rfd_revision( @@ -512,37 +436,7 @@ impl RfdContext { caller: &Caller, rfd_number: i32, ) -> ResourceResult { - if caller.any(&[ - &RfdPermission::GetRfd(rfd_number), - &RfdPermission::GetRfdsAll, - ]) { - let rfds = RfdStore::list( - &*self.storage, - RfdFilter::default().rfd_number(Some(vec![rfd_number])), - &ListPagination::default().limit(1), - ) - .await - .to_resource_result()?; - - if let Some(rfd) = rfds.into_iter().nth(0) { - let revisions = RfdRevisionStore::list( - &*self.storage, - RfdRevisionFilter::default().rfd(Some(vec![rfd.id])), - &ListPagination::default().limit(1), - ) - .await - .to_resource_result()?; - - match revisions.into_iter().nth(0) { - Some(revision) => Ok(revision), - None => resource_not_found(), - } - } else { - resource_not_found() - } - } else { - resource_restricted() - } + self.view_rfd_revision(caller, rfd_number, None).await } #[instrument(skip(self, caller, content))] @@ -739,7 +633,7 @@ impl RfdContext { ]) { let mut rfds = RfdStore::list( &*self.storage, - RfdFilter::default().rfd_number(Some(vec![rfd_number])), + vec![RfdFilter::default().rfd_number(Some(vec![rfd_number]))], &ListPagination::default().limit(1), ) .await @@ -767,18 +661,9 @@ impl RfdContext { #[cfg(test)] pub(crate) mod test_mocks { - use async_trait::async_trait; - use newtype_uuid::TypedUuid; use rand::RngCore; use rfd_data::content::RfdTemplate; - use rfd_model::{ - storage::{ - JobStore, MockJobStore, MockRfdPdfStore, MockRfdRevisionMetaStore, - MockRfdRevisionStore, MockRfdStore, RfdPdfStore, RfdRevisionMetaStore, - RfdRevisionStore, RfdStore, - }, - NewJob, NewRfd, NewRfdPdf, NewRfdRevision, RfdId, RfdPdfId, RfdRevisionId, - }; + use rfd_model::storage::mock::MockStorage; use rsa::{ pkcs8::{EncodePrivateKey, EncodePublicKey, LineEnding}, RsaPrivateKey, RsaPublicKey, @@ -789,7 +674,7 @@ pub(crate) mod test_mocks { endpoints::login::oauth::{google::GoogleOAuthProvider, OAuthProviderName}, VContext, }; - use v_model::storage::{postgres::PostgresStore, ListPagination, StoreError}; + use v_model::storage::postgres::PostgresStore; use crate::config::{ ContentConfig, GitHubAuthConfig, GitHubConfig, SearchConfig, ServicesConfig, @@ -874,220 +759,4 @@ pub(crate) mod test_mocks { ctx } - - // Construct a mock storage engine that can be wrapped in an ApiContext for testing - pub struct MockStorage { - pub rfd_store: Option>, - pub rfd_revision_store: Option>, - pub rfd_pdf_store: Option>, - pub rfd_revision_meta_store: Option>, - pub job_store: Option>, - } - - impl MockStorage { - pub fn new() -> Self { - Self { - rfd_store: None, - rfd_revision_store: None, - rfd_pdf_store: None, - rfd_revision_meta_store: None, - job_store: None, - } - } - } - - #[async_trait] - impl RfdStore for MockStorage { - async fn get( - &self, - id: &TypedUuid, - deleted: bool, - ) -> Result, StoreError> { - self.rfd_store.as_ref().unwrap().get(id, deleted).await - } - - async fn list( - &self, - filter: rfd_model::storage::RfdFilter, - pagination: &ListPagination, - ) -> Result, StoreError> { - self.rfd_store - .as_ref() - .unwrap() - .list(filter, pagination) - .await - } - - async fn upsert(&self, new_rfd: NewRfd) -> Result { - self.rfd_store.as_ref().unwrap().upsert(new_rfd).await - } - - async fn delete( - &self, - id: &TypedUuid, - ) -> Result, StoreError> { - self.rfd_store.as_ref().unwrap().delete(id).await - } - } - - #[async_trait] - impl RfdRevisionStore for MockStorage { - async fn get( - &self, - id: &TypedUuid, - deleted: bool, - ) -> Result, StoreError> { - self.rfd_revision_store - .as_ref() - .unwrap() - .get(id, deleted) - .await - } - - async fn list( - &self, - filter: rfd_model::storage::RfdRevisionFilter, - pagination: &ListPagination, - ) -> Result, StoreError> { - self.rfd_revision_store - .as_ref() - .unwrap() - .list(filter, pagination) - .await - } - - async fn list_unique_rfd( - &self, - filter: rfd_model::storage::RfdRevisionFilter, - pagination: &ListPagination, - ) -> Result, StoreError> { - self.rfd_revision_store - .as_ref() - .unwrap() - .list(filter, pagination) - .await - } - - async fn upsert( - &self, - new_revision: NewRfdRevision, - ) -> Result { - self.rfd_revision_store - .as_ref() - .unwrap() - .upsert(new_revision) - .await - } - - async fn delete( - &self, - id: &TypedUuid, - ) -> Result, StoreError> { - self.rfd_revision_store.as_ref().unwrap().delete(id).await - } - } - - #[async_trait] - impl RfdRevisionMetaStore for MockStorage { - async fn get( - &self, - id: &TypedUuid, - deleted: bool, - ) -> Result, StoreError> { - self.rfd_revision_meta_store - .as_ref() - .unwrap() - .get(id, deleted) - .await - } - - async fn list( - &self, - filter: rfd_model::storage::RfdRevisionFilter, - pagination: &ListPagination, - ) -> Result, StoreError> { - self.rfd_revision_meta_store - .as_ref() - .unwrap() - .list(filter, pagination) - .await - } - - async fn list_unique_rfd( - &self, - filter: rfd_model::storage::RfdRevisionFilter, - pagination: &ListPagination, - ) -> Result, StoreError> { - self.rfd_revision_meta_store - .as_ref() - .unwrap() - .list(filter, pagination) - .await - } - } - - #[async_trait] - impl RfdPdfStore for MockStorage { - async fn get( - &self, - id: &TypedUuid, - deleted: bool, - ) -> Result, StoreError> { - self.rfd_pdf_store.as_ref().unwrap().get(id, deleted).await - } - - async fn list( - &self, - filter: rfd_model::storage::RfdPdfFilter, - pagination: &ListPagination, - ) -> Result, StoreError> { - self.rfd_pdf_store - .as_ref() - .unwrap() - .list(filter, pagination) - .await - } - - async fn upsert(&self, new_pdf: NewRfdPdf) -> Result { - self.rfd_pdf_store.as_ref().unwrap().upsert(new_pdf).await - } - - async fn delete( - &self, - id: &TypedUuid, - ) -> Result, StoreError> { - self.rfd_pdf_store.as_ref().unwrap().delete(id).await - } - } - - #[async_trait] - impl JobStore for MockStorage { - async fn get(&self, id: i32) -> Result, StoreError> { - self.job_store.as_ref().unwrap().get(id).await - } - - async fn list( - &self, - filter: rfd_model::storage::JobFilter, - pagination: &ListPagination, - ) -> Result, StoreError> { - self.job_store - .as_ref() - .unwrap() - .list(filter, pagination) - .await - } - - async fn upsert(&self, new_job: NewJob) -> Result { - self.job_store.as_ref().unwrap().upsert(new_job).await - } - - async fn start(&self, id: i32) -> Result, StoreError> { - self.job_store.as_ref().unwrap().start(id).await - } - - async fn complete(&self, id: i32) -> Result, StoreError> { - self.job_store.as_ref().unwrap().complete(id).await - } - } } diff --git a/rfd-api/src/endpoints/meta.rs b/rfd-api/src/endpoints/meta.rs index 4bfef56d..88a0faca 100644 --- a/rfd-api/src/endpoints/meta.rs +++ b/rfd-api/src/endpoints/meta.rs @@ -11,7 +11,7 @@ use v_api::ApiContext; use v_model::permissions::Caller; use crate::{ - context::{RfdContext, RfdMeta}, + context::{RfdContext, RfdWithoutContent}, permissions::RfdPermission, util::response::client_error, }; @@ -29,24 +29,24 @@ pub struct RfdPathParams { path = "/meta/rfd/{number}", }] #[instrument(skip(rqctx), fields(request_id = rqctx.request_id), err(Debug))] -pub async fn get_rfd_meta( +pub async fn view_rfd_meta( rqctx: RequestContext, path: Path, -) -> Result, HttpError> { +) -> Result, HttpError> { let ctx = rqctx.context(); let caller = ctx.v_ctx().get_caller(&rqctx).await?; - get_rfd_meta_op(ctx, &caller, path.into_inner().number).await + view_rfd_meta_op(ctx, &caller, path.into_inner().number).await } #[instrument(skip(ctx, caller), fields(caller = ?caller.id), err(Debug))] -async fn get_rfd_meta_op( +async fn view_rfd_meta_op( ctx: &RfdContext, caller: &Caller, number: String, -) -> Result, HttpError> { +) -> Result, HttpError> { if let Ok(rfd_number) = number.parse::() { Ok(HttpResponseOk( - ctx.get_rfd_meta(caller, rfd_number, None).await?, + ctx.view_rfd_meta(caller, rfd_number, None).await?, )) } else { Err(client_error( @@ -65,19 +65,17 @@ mod tests { use http::StatusCode; use newtype_uuid::{GenericUuid, TypedUuid}; use rfd_model::{ - storage::{MockRfdRevisionMetaStore, MockRfdStore}, - Rfd, RfdRevisionMeta, + schema_ext::ContentFormat, + storage::{mock::MockStorage, MockRfdMetaStore, MockRfdRevisionMetaStore, MockRfdStore}, + CommitSha, FileSha, Rfd, RfdMeta, RfdRevision, RfdRevisionMeta, }; use uuid::Uuid; use v_api::ApiContext; use v_model::{permissions::Caller, Permissions}; use crate::{ - context::{ - test_mocks::{mock_context, MockStorage}, - RfdContext, - }, - endpoints::meta::get_rfd_meta_op, + context::{test_mocks::mock_context, RfdContext}, + endpoints::meta::view_rfd_meta_op, permissions::RfdPermission, }; @@ -93,6 +91,23 @@ mod tests { id: TypedUuid::from_untyped_uuid(private_rfd_id_1), rfd_number: 123, link: None, + content: RfdRevision { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content: String::new(), + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, created_at: Utc::now(), updated_at: Utc::now(), deleted_at: None, @@ -102,6 +117,23 @@ mod tests { id: TypedUuid::from_untyped_uuid(public_rfd_id), rfd_number: 456, link: None, + content: RfdRevision { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content: String::new(), + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, created_at: Utc::now(), updated_at: Utc::now(), deleted_at: None, @@ -111,6 +143,116 @@ mod tests { id: TypedUuid::from_untyped_uuid(private_rfd_id_2), rfd_number: 789, link: None, + content: RfdRevision { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content: String::new(), + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + visibility: rfd_model::schema_ext::Visibility::Private, + }, + ]; + + results.retain(|rfd| { + filter.len() == 0 + || filter[0].rfd_number.is_none() + || filter[0] + .rfd_number + .as_ref() + .unwrap() + .contains(&rfd.rfd_number) + }); + + Ok(results) + }); + + let mut rfd_meta_store = MockRfdMetaStore::new(); + rfd_meta_store.expect_list().returning(move |filter, _| { + let mut results = vec![ + RfdMeta { + id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + rfd_number: 123, + link: None, + content: RfdRevisionMeta { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + visibility: rfd_model::schema_ext::Visibility::Private, + }, + RfdMeta { + id: TypedUuid::from_untyped_uuid(public_rfd_id), + rfd_number: 456, + link: None, + content: RfdRevisionMeta { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + visibility: rfd_model::schema_ext::Visibility::Public, + }, + RfdMeta { + id: TypedUuid::from_untyped_uuid(private_rfd_id_2), + rfd_number: 789, + link: None, + content: RfdRevisionMeta { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, created_at: Utc::now(), updated_at: Utc::now(), deleted_at: None, @@ -119,8 +261,9 @@ mod tests { ]; results.retain(|rfd| { - filter.rfd_number.is_none() - || filter + filter.len() == 0 + || filter[0].rfd_number.is_none() + || filter[0] .rfd_number .as_ref() .unwrap() @@ -186,7 +329,9 @@ mod tests { ]; results.retain(|revision| { - filter.rfd.is_none() || filter.rfd.as_ref().unwrap().contains(&revision.rfd_id) + filter.len() == 0 + || filter[0].rfd.is_none() + || filter[0].rfd.as_ref().unwrap().contains(&revision.rfd_id) }); Ok(results) @@ -194,6 +339,7 @@ mod tests { let mut storage = MockStorage::new(); storage.rfd_store = Some(Arc::new(rfd_store)); + storage.rfd_meta_store = Some(Arc::new(rfd_meta_store)); storage.rfd_revision_meta_store = Some(Arc::new(rfd_revision_meta_store)); mock_context(storage).await @@ -206,12 +352,12 @@ mod tests { let ctx = ctx().await; let caller = Caller::from(Permissions::from(vec![RfdPermission::GetRfdsAll])); - let HttpResponseOk(rfd) = get_rfd_meta_op(&ctx, &caller, "0123".to_string()) + let HttpResponseOk(rfd) = view_rfd_meta_op(&ctx, &caller, "0123".to_string()) .await .unwrap(); assert_eq!(123, rfd.rfd_number); - let HttpResponseOk(rfd) = get_rfd_meta_op(&ctx, &caller, "0456".to_string()) + let HttpResponseOk(rfd) = view_rfd_meta_op(&ctx, &caller, "0456".to_string()) .await .unwrap(); assert_eq!(456, rfd.rfd_number); @@ -224,12 +370,12 @@ mod tests { let ctx = ctx().await; let caller = Caller::from(Permissions::from(vec![RfdPermission::GetRfd(123)])); - let HttpResponseOk(rfd) = get_rfd_meta_op(&ctx, &caller, "0123".to_string()) + let HttpResponseOk(rfd) = view_rfd_meta_op(&ctx, &caller, "0123".to_string()) .await .unwrap(); assert_eq!(123, rfd.rfd_number); - let HttpResponseOk(rfd) = get_rfd_meta_op(&ctx, &caller, "0456".to_string()) + let HttpResponseOk(rfd) = view_rfd_meta_op(&ctx, &caller, "0456".to_string()) .await .unwrap(); assert_eq!(456, rfd.rfd_number); @@ -242,7 +388,7 @@ mod tests { let ctx = ctx().await; let caller = Caller::from(Permissions::::new()); - let result = get_rfd_meta_op(&ctx, &caller, "0123".to_string()).await; + let result = view_rfd_meta_op(&ctx, &caller, "0123".to_string()).await; match result { Err(err) => assert_eq!(StatusCode::NOT_FOUND, err.status_code), @@ -260,7 +406,7 @@ mod tests { let ctx = ctx().await; let caller = ctx.v_ctx().builtin_unauthenticated_caller(); - let result = get_rfd_meta_op(&ctx, &caller, "0123".to_string()).await; + let result = view_rfd_meta_op(&ctx, &caller, "0123".to_string()).await; match result { Err(err) => assert_eq!(StatusCode::NOT_FOUND, err.status_code), Ok(response) => panic!( diff --git a/rfd-api/src/endpoints/rfd.rs b/rfd-api/src/endpoints/rfd.rs index fe06ce1a..6c7c0912 100644 --- a/rfd-api/src/endpoints/rfd.rs +++ b/rfd-api/src/endpoints/rfd.rs @@ -23,7 +23,7 @@ use v_model::permissions::Caller; use crate::{ caller::CallerExt, - context::{FullRfd, RfdContext, RfdMeta}, + context::{RfdContext, RfdWithContent, RfdWithoutContent}, permissions::RfdPermission, search::{MeiliSearchResult, SearchRequest}, util::response::{client_error, internal_error, unauthorized}, @@ -36,19 +36,19 @@ use crate::{ path = "/rfd", }] #[instrument(skip(rqctx), fields(request_id = rqctx.request_id), err(Debug))] -pub async fn get_rfds( +pub async fn list_rfds( rqctx: RequestContext, -) -> Result>, HttpError> { +) -> Result>, HttpError> { let ctx = rqctx.context(); let caller = ctx.v_ctx().get_caller(&rqctx).await?; - get_rfds_op(ctx, &caller).await + list_rfds_op(ctx, &caller).await } #[instrument(skip(ctx, caller), fields(caller = ?caller.id), err(Debug))] -async fn get_rfds_op( +async fn list_rfds_op( ctx: &RfdContext, caller: &Caller, -) -> Result>, HttpError> { +) -> Result>, HttpError> { let rfds = ctx.list_rfds(caller, None).await?; Ok(HttpResponseOk(rfds)) } @@ -107,23 +107,25 @@ pub struct RfdPathParams { path = "/rfd/{number}", }] #[instrument(skip(rqctx), fields(request_id = rqctx.request_id), err(Debug))] -pub async fn get_rfd( +pub async fn view_rfd( rqctx: RequestContext, path: Path, -) -> Result, HttpError> { +) -> Result, HttpError> { let ctx = rqctx.context(); let caller = ctx.v_ctx().get_caller(&rqctx).await?; - get_rfd_op(ctx, &caller, path.into_inner().number).await + view_rfd_op(ctx, &caller, path.into_inner().number).await } #[instrument(skip(ctx, caller), fields(caller = ?caller.id), err(Debug))] -async fn get_rfd_op( +async fn view_rfd_op( ctx: &RfdContext, caller: &Caller, number: String, -) -> Result, HttpError> { +) -> Result, HttpError> { if let Ok(rfd_number) = number.parse::() { - Ok(HttpResponseOk(ctx.get_rfd(caller, rfd_number, None).await?)) + Ok(HttpResponseOk( + ctx.view_rfd(caller, rfd_number, None).await?, + )) } else { Err(client_error( ClientErrorStatusCode::BAD_REQUEST, @@ -257,25 +259,25 @@ pub enum RfdAttr { path = "/rfd/{number}/attr/{attr}", }] #[instrument(skip(rqctx), fields(request_id = rqctx.request_id), err(Debug))] -pub async fn get_rfd_attr( +pub async fn view_rfd_attr( rqctx: RequestContext, path: Path, ) -> Result, HttpError> { let ctx = rqctx.context(); let caller = ctx.v_ctx().get_caller(&rqctx).await?; let path = path.into_inner(); - get_rfd_attr_op(ctx, &caller, path.number, path.attr).await + view_rfd_attr_op(ctx, &caller, path.number, path.attr).await } #[instrument(skip(ctx, caller), fields(caller = ?caller.id), err(Debug))] -async fn get_rfd_attr_op( +async fn view_rfd_attr_op( ctx: &RfdContext, caller: &Caller, number: String, attr: RfdAttrName, ) -> Result, HttpError> { if let Ok(rfd_number) = number.parse::() { - let rfd = ctx.get_rfd(caller, rfd_number, None).await?; + let rfd = ctx.view_rfd(caller, rfd_number, None).await?; let content = match rfd.format { ContentFormat::Asciidoc => RfdContent::Asciidoc(RfdAsciidoc::new(rfd.content)), ContentFormat::Markdown => RfdContent::Markdown(RfdMarkdown::new(rfd.content)), @@ -326,7 +328,7 @@ async fn set_rfd_attr_op( ) -> Result, HttpError> { if let Ok(rfd_number) = number.parse::() { // Get the latest revision - let revision = ctx.get_rfd_revision(caller, rfd_number, None).await?; + let revision = ctx.view_rfd_revision(caller, rfd_number, None).await?; // TODO: Get rid of these clones let mut content = match revision.content_format { @@ -648,23 +650,24 @@ mod tests { use http::StatusCode; use newtype_uuid::{GenericUuid, TypedUuid}; use rfd_model::{ - storage::{MockRfdPdfStore, MockRfdRevisionMetaStore, MockRfdRevisionStore, MockRfdStore}, - Rfd, RfdRevision, RfdRevisionMeta, + schema_ext::ContentFormat, + storage::{ + mock::MockStorage, MockRfdMetaStore, MockRfdPdfStore, MockRfdRevisionMetaStore, + MockRfdRevisionStore, MockRfdStore, + }, + CommitSha, FileSha, Rfd, RfdMeta, RfdRevision, RfdRevisionMeta, }; use uuid::Uuid; use v_api::ApiContext; use v_model::{permissions::Caller, Permissions}; use crate::{ - context::{ - test_mocks::{mock_context, MockStorage}, - RfdContext, - }, - endpoints::rfd::get_rfd_op, + context::{test_mocks::mock_context, RfdContext}, + endpoints::rfd::view_rfd_op, permissions::RfdPermission, }; - use super::get_rfds_op; + use super::list_rfds_op; async fn ctx() -> RfdContext { let private_rfd_id_1 = Uuid::new_v4(); @@ -678,6 +681,23 @@ mod tests { id: TypedUuid::from_untyped_uuid(private_rfd_id_1), rfd_number: 123, link: None, + content: RfdRevision { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content: String::new(), + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, created_at: Utc::now(), updated_at: Utc::now(), deleted_at: None, @@ -687,6 +707,23 @@ mod tests { id: TypedUuid::from_untyped_uuid(public_rfd_id), rfd_number: 456, link: None, + content: RfdRevision { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content: String::new(), + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, created_at: Utc::now(), updated_at: Utc::now(), deleted_at: None, @@ -696,6 +733,116 @@ mod tests { id: TypedUuid::from_untyped_uuid(private_rfd_id_2), rfd_number: 789, link: None, + content: RfdRevision { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content: String::new(), + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + visibility: rfd_model::schema_ext::Visibility::Private, + }, + ]; + + results.retain(|rfd| { + filter.len() == 0 + || filter[0].rfd_number.is_none() + || filter[0] + .rfd_number + .as_ref() + .unwrap() + .contains(&rfd.rfd_number) + }); + + Ok(results) + }); + + let mut rfd_meta_store = MockRfdMetaStore::new(); + rfd_meta_store.expect_list().returning(move |filter, _| { + let mut results = vec![ + RfdMeta { + id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + rfd_number: 123, + link: None, + content: RfdRevisionMeta { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + visibility: rfd_model::schema_ext::Visibility::Private, + }, + RfdMeta { + id: TypedUuid::from_untyped_uuid(public_rfd_id), + rfd_number: 456, + link: None, + content: RfdRevisionMeta { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + visibility: rfd_model::schema_ext::Visibility::Public, + }, + RfdMeta { + id: TypedUuid::from_untyped_uuid(private_rfd_id_2), + rfd_number: 789, + link: None, + content: RfdRevisionMeta { + id: TypedUuid::new_v4(), + rfd_id: TypedUuid::from_untyped_uuid(private_rfd_id_1), + title: String::new(), + state: None, + discussion: None, + authors: None, + labels: None, + content_format: ContentFormat::Asciidoc, + sha: FileSha(String::new()), + commit: CommitSha(String::new()), + committed_at: Utc::now(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }, created_at: Utc::now(), updated_at: Utc::now(), deleted_at: None, @@ -704,8 +851,9 @@ mod tests { ]; results.retain(|rfd| { - filter.rfd_number.is_none() - || filter + filter.len() == 0 + || filter[0].rfd_number.is_none() + || filter[0] .rfd_number .as_ref() .unwrap() @@ -777,7 +925,8 @@ mod tests { ]; results.retain(|revision| { - filter.rfd.is_none() || filter.rfd.as_ref().unwrap().contains(&revision.rfd_id) + filter[0].rfd.is_none() + || filter[0].rfd.as_ref().unwrap().contains(&revision.rfd_id) }); Ok(results) @@ -839,7 +988,9 @@ mod tests { ]; results.retain(|revision| { - filter.rfd.is_none() || filter.rfd.as_ref().unwrap().contains(&revision.rfd_id) + filter.len() == 0 + || filter[0].rfd.is_none() + || filter[0].rfd.as_ref().unwrap().contains(&revision.rfd_id) }); Ok(results) @@ -852,6 +1003,7 @@ mod tests { let mut storage = MockStorage::new(); storage.rfd_store = Some(Arc::new(rfd_store)); + storage.rfd_meta_store = Some(Arc::new(rfd_meta_store)); storage.rfd_revision_store = Some(Arc::new(rfd_revision_store)); storage.rfd_revision_meta_store = Some(Arc::new(rfd_revision_meta_store)); storage.rfd_pdf_store = Some(Arc::new(rfd_pdf_store)); @@ -866,7 +1018,7 @@ mod tests { let ctx = ctx().await; let caller = Caller::from(Permissions::from(vec![RfdPermission::GetRfdsAll])); - let HttpResponseOk(rfds) = get_rfds_op(&ctx, &caller).await.unwrap(); + let HttpResponseOk(rfds) = list_rfds_op(&ctx, &caller).await.unwrap(); assert_eq!(3, rfds.len()); assert_eq!(789, rfds[0].rfd_number); assert_eq!(456, rfds[1].rfd_number); @@ -874,14 +1026,18 @@ mod tests { } #[tokio::test] - async fn get_rfd_via_all_permission() { + async fn view_rfd_via_all_permission() { let ctx = ctx().await; let caller = Caller::from(Permissions::from(vec![RfdPermission::GetRfdsAll])); - let HttpResponseOk(rfd) = get_rfd_op(&ctx, &caller, "0123".to_string()).await.unwrap(); + let HttpResponseOk(rfd) = view_rfd_op(&ctx, &caller, "0123".to_string()) + .await + .unwrap(); assert_eq!(123, rfd.rfd_number); - let HttpResponseOk(rfd) = get_rfd_op(&ctx, &caller, "0456".to_string()).await.unwrap(); + let HttpResponseOk(rfd) = view_rfd_op(&ctx, &caller, "0456".to_string()) + .await + .unwrap(); assert_eq!(456, rfd.rfd_number); } @@ -892,21 +1048,25 @@ mod tests { let ctx = ctx().await; let caller = Caller::from(Permissions::from(vec![RfdPermission::GetRfd(123)])); - let HttpResponseOk(rfds) = get_rfds_op(&ctx, &caller).await.unwrap(); + let HttpResponseOk(rfds) = list_rfds_op(&ctx, &caller).await.unwrap(); assert_eq!(2, rfds.len()); assert_eq!(456, rfds[0].rfd_number); assert_eq!(123, rfds[1].rfd_number); } #[tokio::test] - async fn get_rfd_with_direct_permission() { + async fn view_rfd_with_direct_permission() { let ctx = ctx().await; let caller = Caller::from(Permissions::from(vec![RfdPermission::GetRfd(123)])); - let HttpResponseOk(rfd) = get_rfd_op(&ctx, &caller, "0123".to_string()).await.unwrap(); + let HttpResponseOk(rfd) = view_rfd_op(&ctx, &caller, "0123".to_string()) + .await + .unwrap(); assert_eq!(123, rfd.rfd_number); - let HttpResponseOk(rfd) = get_rfd_op(&ctx, &caller, "0456".to_string()).await.unwrap(); + let HttpResponseOk(rfd) = view_rfd_op(&ctx, &caller, "0456".to_string()) + .await + .unwrap(); assert_eq!(456, rfd.rfd_number); } @@ -917,17 +1077,17 @@ mod tests { let ctx = ctx().await; let caller = Caller::from(Permissions::::new()); - let HttpResponseOk(rfds) = get_rfds_op(&ctx, &caller).await.unwrap(); + let HttpResponseOk(rfds) = list_rfds_op(&ctx, &caller).await.unwrap(); assert_eq!(1, rfds.len()); assert_eq!(456, rfds[0].rfd_number); } #[tokio::test] - async fn get_rfd_without_permission() { + async fn view_rfd_without_permission() { let ctx = ctx().await; let caller = Caller::from(Permissions::::new()); - let result = get_rfd_op(&ctx, &caller, "0123".to_string()).await; + let result = view_rfd_op(&ctx, &caller, "0123".to_string()).await; match result { Err(err) => assert_eq!(StatusCode::NOT_FOUND, err.status_code), @@ -944,19 +1104,20 @@ mod tests { async fn list_rfds_as_unauthenticated() { let ctx = ctx().await; - let HttpResponseOk(rfds) = get_rfds_op(&ctx, &ctx.v_ctx().builtin_unauthenticated_caller()) - .await - .unwrap(); + let HttpResponseOk(rfds) = + list_rfds_op(&ctx, &ctx.v_ctx().builtin_unauthenticated_caller()) + .await + .unwrap(); assert_eq!(1, rfds.len()); assert_eq!(456, rfds[0].rfd_number); } #[tokio::test] - async fn get_rfd_as_unauthenticated() { + async fn view_rfd_as_unauthenticated() { let ctx = ctx().await; let caller = ctx.v_ctx().builtin_unauthenticated_caller(); - let result = get_rfd_op(&ctx, &caller, "0123".to_string()).await; + let result = view_rfd_op(&ctx, &caller, "0123".to_string()).await; match result { Err(err) => assert_eq!(StatusCode::NOT_FOUND, err.status_code), Ok(response) => panic!( diff --git a/rfd-api/src/main.rs b/rfd-api/src/main.rs index e11cae4b..41ebc0cb 100644 --- a/rfd-api/src/main.rs +++ b/rfd-api/src/main.rs @@ -3,7 +3,6 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use context::RfdContext; -use rfd_model::storage::postgres::PostgresStore; use server::{server, ServerConfig}; use std::{ net::{SocketAddr, SocketAddrV4}, @@ -18,7 +17,7 @@ use v_api::{ }, ApiContext, VContext, }; -use v_model::storage::postgres::PostgresStore as VApiPostgressStore; +use v_model::storage::postgres::PostgresStore as VApiPostgresStore; use crate::{ config::{AppConfig, ServerLogFormat}, @@ -69,7 +68,7 @@ async fn main() -> anyhow::Result<()> { let mut v_ctx = VContext::new( config.public_url.clone(), Arc::new( - VApiPostgressStore::new(&config.database_url) + VApiPostgresStore::new(&config.database_url) .await .tap_err(|err| { tracing::error!(?err, "Failed to establish initial database connection"); @@ -120,7 +119,7 @@ async fn main() -> anyhow::Result<()> { let context = RfdContext::new( config.public_url, Arc::new( - PostgresStore::new(&config.database_url) + VApiPostgresStore::new(&config.database_url) .await .tap_err(|err| { tracing::error!(?err, "Failed to establish initial database connection"); diff --git a/rfd-api/src/server.rs b/rfd-api/src/server.rs index afacad13..ddb726a3 100644 --- a/rfd-api/src/server.rs +++ b/rfd-api/src/server.rs @@ -15,10 +15,10 @@ use v_api::{inject_endpoints, v_system_endpoints}; use crate::{ context::RfdContext, endpoints::{ - meta::get_rfd_meta, + meta::view_rfd_meta, rfd::{ - discuss_rfd, get_rfd, get_rfd_attr, get_rfds, publish_rfd, reserve_rfd, search_rfds, - set_rfd_attr, set_rfd_content, set_rfd_document, update_rfd_visibility, + discuss_rfd, list_rfds, publish_rfd, reserve_rfd, search_rfds, set_rfd_attr, + set_rfd_content, set_rfd_document, update_rfd_visibility, view_rfd, view_rfd_attr, }, webhook::github_webhook, }, @@ -74,9 +74,10 @@ pub fn server( inject_endpoints!(api); // RFDs - api.register(get_rfds).expect("Failed to register endpoint"); - api.register(get_rfd).expect("Failed to register endpoint"); - api.register(get_rfd_meta) + api.register(list_rfds) + .expect("Failed to register endpoint"); + api.register(view_rfd).expect("Failed to register endpoint"); + api.register(view_rfd_meta) .expect("Failed to register endpoint"); api.register(reserve_rfd) .expect("Failed to register endpoint"); @@ -84,7 +85,7 @@ pub fn server( .expect("Failed to register endpoint"); api.register(set_rfd_content) .expect("Failed to register endpoint"); - api.register(get_rfd_attr) + api.register(view_rfd_attr) .expect("Failed to register endpoint"); api.register(set_rfd_attr) .expect("Failed to register endpoint"); diff --git a/rfd-cli/src/main.rs b/rfd-cli/src/main.rs index a3ccc6ab..9453ed97 100644 --- a/rfd-cli/src/main.rs +++ b/rfd-cli/src/main.rs @@ -409,7 +409,7 @@ impl ProgenitorCliConfig for Context { .printer() .unwrap() .output_oauth_secret(reserialize(value)), - "Array_of_ListRfd" => self.printer().unwrap().output_rfd_list(reserialize(value)), + "Array_of_RfdMeta" => self.printer().unwrap().output_rfd_list(reserialize(value)), "FullRfd" => self.printer().unwrap().output_rfd_full(reserialize(value)), "Rfd" => self.printer().unwrap().output_rfd(reserialize(value)), "SearchResults" => self diff --git a/rfd-github/src/lib.rs b/rfd-github/src/lib.rs index 542595f4..cc31a773 100644 --- a/rfd-github/src/lib.rs +++ b/rfd-github/src/lib.rs @@ -730,6 +730,28 @@ impl GitHubRfdLocation { } } +#[derive(Clone)] +struct GitHubPullRequestComments { + pub client: Client, +} + +impl GitHubPullRequestComments { + async fn comments(&self) { + let pulls = self.client.pulls(); + let comments = pulls + .list_all_review_comments( + "owner", + "repo", + 0, + octorust::types::Sort::Created, + octorust::types::Order::Desc, + None, + ) + .await + .unwrap(); + } +} + struct FetchedRfdContent { decoded: Vec, parsed: String, diff --git a/rfd-model/src/db.rs b/rfd-model/src/db.rs index 84333931..92aac45c 100644 --- a/rfd-model/src/db.rs +++ b/rfd-model/src/db.rs @@ -3,7 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use chrono::{DateTime, Utc}; -use diesel::{Insertable, Queryable}; +use diesel::{Insertable, Queryable, Selectable}; use partial_struct::partial; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -13,7 +13,7 @@ use crate::{ schema_ext::{ContentFormat, PdfSource, Visibility}, }; -#[derive(Debug, Deserialize, Serialize, Queryable, Insertable)] +#[derive(Debug, Deserialize, Serialize, Queryable, Insertable, Selectable)] #[diesel(table_name = rfd)] pub struct RfdModel { pub id: Uuid, @@ -26,7 +26,7 @@ pub struct RfdModel { } #[partial(RfdRevisionMetaModel)] -#[derive(Debug, Deserialize, Serialize, Queryable, Insertable)] +#[derive(Debug, Deserialize, Serialize, Queryable, Insertable, Selectable)] #[diesel(table_name = rfd_revision)] pub struct RfdRevisionModel { pub id: Uuid, diff --git a/rfd-model/src/lib.rs b/rfd-model/src/lib.rs index 72b647d8..0dc53323 100644 --- a/rfd-model/src/lib.rs +++ b/rfd-model/src/lib.rs @@ -63,12 +63,16 @@ impl TypedUuidKind for RfdId { } #[partial(NewRfd)] +#[partial(RfdMeta)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct Rfd { pub id: TypedUuid, pub rfd_number: i32, pub link: Option, #[partial(NewRfd(skip))] + #[partial(RfdMeta(retype = RfdRevisionMeta))] + pub content: RfdRevision, + #[partial(NewRfd(skip))] pub created_at: DateTime, #[partial(NewRfd(skip))] pub updated_at: DateTime, @@ -77,16 +81,32 @@ pub struct Rfd { pub visibility: Visibility, } -impl From for Rfd { - fn from(value: RfdModel) -> Self { +impl From<(RfdModel, RfdRevisionModel)> for Rfd { + fn from((rfd, revision): (RfdModel, RfdRevisionModel)) -> Self { Self { - id: TypedUuid::from_untyped_uuid(value.id), - rfd_number: value.rfd_number, - link: value.link, - created_at: value.created_at, - updated_at: value.updated_at, - deleted_at: value.deleted_at, - visibility: value.visibility, + id: TypedUuid::from_untyped_uuid(rfd.id), + rfd_number: rfd.rfd_number, + link: rfd.link, + content: revision.into(), + created_at: rfd.created_at, + updated_at: rfd.updated_at, + deleted_at: rfd.deleted_at, + visibility: rfd.visibility, + } + } +} + +impl From<(RfdModel, RfdRevisionMetaModel)> for RfdMeta { + fn from((rfd, revision): (RfdModel, RfdRevisionMetaModel)) -> Self { + Self { + id: TypedUuid::from_untyped_uuid(rfd.id), + rfd_number: rfd.rfd_number, + link: rfd.link, + content: revision.into(), + created_at: rfd.created_at, + updated_at: rfd.updated_at, + deleted_at: rfd.deleted_at, + visibility: rfd.visibility, } } } diff --git a/rfd-model/src/storage/mock.rs b/rfd-model/src/storage/mock.rs new file mode 100644 index 00000000..cf3f829c --- /dev/null +++ b/rfd-model/src/storage/mock.rs @@ -0,0 +1,236 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use async_trait::async_trait; +use newtype_uuid::TypedUuid; +use std::sync::Arc; +use v_model::storage::StoreError; + +use crate::{ + Job, NewJob, NewRfd, NewRfdPdf, NewRfdRevision, Rfd, RfdId, RfdMeta, RfdPdf, RfdPdfId, + RfdRevision, RfdRevisionId, RfdRevisionMeta, +}; + +use super::{ + JobFilter, JobStore, ListPagination, MockJobStore, MockRfdMetaStore, MockRfdPdfStore, + MockRfdRevisionMetaStore, MockRfdRevisionStore, MockRfdStore, RfdFilter, RfdMetaStore, + RfdPdfFilter, RfdPdfStore, RfdRevisionFilter, RfdRevisionMetaStore, RfdRevisionStore, RfdStore, +}; + +pub struct MockStorage { + pub rfd_store: Option>, + pub rfd_meta_store: Option>, + pub rfd_revision_store: Option>, + pub rfd_revision_meta_store: Option>, + pub rfd_pdf_store: Option>, + pub job_store: Option>, +} + +impl MockStorage { + pub fn new() -> Self { + Self { + rfd_store: None, + rfd_meta_store: None, + rfd_revision_store: None, + rfd_revision_meta_store: None, + rfd_pdf_store: None, + job_store: None, + } + } +} + +#[async_trait] +impl RfdStore for MockStorage { + async fn get( + &self, + id: &TypedUuid, + revision: Option>, + deleted: bool, + ) -> Result, StoreError> { + self.rfd_store + .as_ref() + .unwrap() + .get(id, revision, deleted) + .await + } + + async fn list( + &self, + filters: Vec, + pagination: &ListPagination, + ) -> Result, StoreError> { + self.rfd_store + .as_ref() + .unwrap() + .list(filters, pagination) + .await + } + + async fn upsert(&self, new_rfd: NewRfd) -> Result { + self.rfd_store.as_ref().unwrap().upsert(new_rfd).await + } + + async fn delete(&self, id: &TypedUuid) -> Result, StoreError> { + self.rfd_store.as_ref().unwrap().delete(id).await + } +} + +#[async_trait] +impl RfdMetaStore for MockStorage { + async fn get( + &self, + id: TypedUuid, + revision: Option>, + deleted: bool, + ) -> Result, StoreError> { + self.rfd_meta_store + .as_ref() + .unwrap() + .get(id, revision, deleted) + .await + } + + async fn list( + &self, + filters: Vec, + pagination: &ListPagination, + ) -> Result, StoreError> { + self.rfd_meta_store + .as_ref() + .unwrap() + .list(filters, pagination) + .await + } +} + +#[async_trait] +impl RfdRevisionStore for MockStorage { + async fn get( + &self, + id: &TypedUuid, + deleted: bool, + ) -> Result, StoreError> { + self.rfd_revision_store + .as_ref() + .unwrap() + .get(id, deleted) + .await + } + + async fn list( + &self, + filters: Vec, + pagination: &ListPagination, + ) -> Result, StoreError> { + self.rfd_revision_store + .as_ref() + .unwrap() + .list(filters, pagination) + .await + } + + async fn upsert(&self, new_revision: NewRfdRevision) -> Result { + self.rfd_revision_store + .as_ref() + .unwrap() + .upsert(new_revision) + .await + } + + async fn delete( + &self, + id: &TypedUuid, + ) -> Result, StoreError> { + self.rfd_revision_store.as_ref().unwrap().delete(id).await + } +} + +#[async_trait] +impl RfdRevisionMetaStore for MockStorage { + async fn get( + &self, + id: &TypedUuid, + deleted: bool, + ) -> Result, StoreError> { + self.rfd_revision_meta_store + .as_ref() + .unwrap() + .get(id, deleted) + .await + } + + async fn list( + &self, + filters: Vec, + pagination: &ListPagination, + ) -> Result, StoreError> { + self.rfd_revision_meta_store + .as_ref() + .unwrap() + .list(filters, pagination) + .await + } +} + +#[async_trait] +impl RfdPdfStore for MockStorage { + async fn get( + &self, + id: &TypedUuid, + deleted: bool, + ) -> Result, StoreError> { + self.rfd_pdf_store.as_ref().unwrap().get(id, deleted).await + } + + async fn list( + &self, + filters: Vec, + pagination: &ListPagination, + ) -> Result, StoreError> { + self.rfd_pdf_store + .as_ref() + .unwrap() + .list(filters, pagination) + .await + } + + async fn upsert(&self, new_pdf: NewRfdPdf) -> Result { + self.rfd_pdf_store.as_ref().unwrap().upsert(new_pdf).await + } + + async fn delete(&self, id: &TypedUuid) -> Result, StoreError> { + self.rfd_pdf_store.as_ref().unwrap().delete(id).await + } +} + +#[async_trait] +impl JobStore for MockStorage { + async fn get(&self, id: i32) -> Result, StoreError> { + self.job_store.as_ref().unwrap().get(id).await + } + + async fn list( + &self, + filters: Vec, + pagination: &ListPagination, + ) -> Result, StoreError> { + self.job_store + .as_ref() + .unwrap() + .list(filters, pagination) + .await + } + + async fn upsert(&self, new_job: NewJob) -> Result { + self.job_store.as_ref().unwrap().upsert(new_job).await + } + + async fn start(&self, id: i32) -> Result, StoreError> { + self.job_store.as_ref().unwrap().start(id).await + } + + async fn complete(&self, id: i32) -> Result, StoreError> { + self.job_store.as_ref().unwrap().complete(id).await + } +} diff --git a/rfd-model/src/storage/mod.rs b/rfd-model/src/storage/mod.rs index 8eb00d60..fe63a0fb 100644 --- a/rfd-model/src/storage/mod.rs +++ b/rfd-model/src/storage/mod.rs @@ -12,16 +12,45 @@ use std::fmt::Debug; use v_model::storage::{ListPagination, StoreError}; use crate::{ - schema_ext::PdfSource, Job, NewJob, NewRfd, NewRfdPdf, NewRfdRevision, Rfd, RfdId, RfdPdf, - RfdPdfId, RfdRevision, RfdRevisionId, RfdRevisionMeta, + schema_ext::PdfSource, CommitSha, Job, NewJob, NewRfd, NewRfdPdf, NewRfdRevision, Rfd, RfdId, + RfdMeta, RfdPdf, RfdPdfId, RfdRevision, RfdRevisionId, RfdRevisionMeta, }; +#[cfg(feature = "mock")] +pub mod mock; pub mod postgres; +pub trait RfdStorage: + RfdStore + + RfdMetaStore + + RfdRevisionStore + + RfdRevisionMetaStore + + RfdPdfStore + + JobStore + + Send + + Sync + + 'static +{ +} +impl RfdStorage for T where + T: RfdStore + + RfdMetaStore + + RfdRevisionStore + + RfdRevisionMetaStore + + RfdPdfStore + + JobStore + + Send + + Sync + + 'static +{ +} + #[derive(Debug, Default)] pub struct RfdFilter { pub id: Option>>, + pub revision: Option>>, pub rfd_number: Option>, + pub commit: Option>, pub public: Option, pub deleted: bool, } @@ -32,11 +61,21 @@ impl RfdFilter { self } + pub fn revision(mut self, revision: Option>>) -> Self { + self.revision = revision; + self + } + pub fn rfd_number(mut self, rfd_number: Option>) -> Self { self.rfd_number = rfd_number; self } + pub fn commit(mut self, commit: Option>) -> Self { + self.commit = commit; + self + } + pub fn public(mut self, public: Option) -> Self { self.public = public; self @@ -51,16 +90,37 @@ impl RfdFilter { #[cfg_attr(feature = "mock", automock)] #[async_trait] pub trait RfdStore { - async fn get(&self, id: &TypedUuid, deleted: bool) -> Result, StoreError>; + async fn get( + &self, + id: &TypedUuid, + revision: Option>, + deleted: bool, + ) -> Result, StoreError>; async fn list( &self, - filter: RfdFilter, + filters: Vec, pagination: &ListPagination, ) -> Result, StoreError>; async fn upsert(&self, new_rfd: NewRfd) -> Result; async fn delete(&self, id: &TypedUuid) -> Result, StoreError>; } +#[cfg_attr(feature = "mock", automock)] +#[async_trait] +pub trait RfdMetaStore { + async fn get( + &self, + id: TypedUuid, + revision: Option>, + deleted: bool, + ) -> Result, StoreError>; + async fn list( + &self, + filters: Vec, + pagination: &ListPagination, + ) -> Result, StoreError>; +} + // TODO: Make the revision store generic over a revision type. We want to be able to have a metadata // only version of the revision model so that we do not need to always load content from the db @@ -68,7 +128,7 @@ pub trait RfdStore { pub struct RfdRevisionFilter { pub id: Option>>, pub rfd: Option>>, - pub sha: Option>, + pub commit: Option>, pub deleted: bool, } @@ -83,8 +143,8 @@ impl RfdRevisionFilter { self } - pub fn sha(mut self, sha: Option>) -> Self { - self.sha = sha; + pub fn commit(mut self, commit: Option>) -> Self { + self.commit = commit; self } @@ -111,14 +171,14 @@ pub trait RfdRevisionStore { ) -> Result, StoreError>; async fn list( &self, - filter: RfdRevisionFilter, - pagination: &ListPagination, - ) -> Result, StoreError>; - async fn list_unique_rfd( - &self, - filter: RfdRevisionFilter, + filters: Vec, pagination: &ListPagination, ) -> Result, StoreError>; + // async fn list_unique_rfd( + // &self, + // filters: Vec, + // pagination: &ListPagination, + // ) -> Result, StoreError>; async fn upsert(&self, new_revision: NewRfdRevision) -> Result; async fn delete( &self, @@ -136,14 +196,14 @@ pub trait RfdRevisionMetaStore { ) -> Result, StoreError>; async fn list( &self, - filter: RfdRevisionFilter, - pagination: &ListPagination, - ) -> Result, StoreError>; - async fn list_unique_rfd( - &self, - filter: RfdRevisionFilter, + filters: Vec, pagination: &ListPagination, ) -> Result, StoreError>; + // async fn list_unique_rfd( + // &self, + // filter: RfdRevisionFilter, + // pagination: &ListPagination, + // ) -> Result, StoreError>; } #[derive(Debug, Default)] @@ -198,7 +258,7 @@ pub trait RfdPdfStore { ) -> Result, StoreError>; async fn list( &self, - filter: RfdPdfFilter, + filters: Vec, pagination: &ListPagination, ) -> Result, StoreError>; async fn upsert(&self, new_revision: NewRfdPdf) -> Result; @@ -241,7 +301,7 @@ pub trait JobStore { async fn get(&self, id: i32) -> Result, StoreError>; async fn list( &self, - filter: JobFilter, + filters: Vec, pagination: &ListPagination, ) -> Result, StoreError>; async fn upsert(&self, new_job: NewJob) -> Result; diff --git a/rfd-model/src/storage/postgres.rs b/rfd-model/src/storage/postgres.rs index 6d35b2b0..37ba2596 100644 --- a/rfd-model/src/storage/postgres.rs +++ b/rfd-model/src/storage/postgres.rs @@ -2,74 +2,50 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionError, ConnectionManager}; +use async_bb8_diesel::AsyncRunQueryDsl; use async_trait::async_trait; -use bb8::Pool; use chrono::Utc; use diesel::{ debug_query, insert_into, - pg::PgConnection, + pg::Pg, query_dsl::QueryDsl, + sql_types::Bool, update, upsert::{excluded, on_constraint}, - ExpressionMethods, + BoolExpressionMethods, BoxableExpression, ExpressionMethods, SelectableHelper, }; use newtype_uuid::{GenericUuid, TypedUuid}; -use std::time::Duration; -use thiserror::Error; use uuid::Uuid; +use v_model::storage::postgres::PostgresStore; use crate::{ db::{JobModel, RfdModel, RfdPdfModel, RfdRevisionMetaModel, RfdRevisionModel}, schema::{job, rfd, rfd_pdf, rfd_revision}, schema_ext::Visibility, storage::StoreError, - Job, NewJob, NewRfd, NewRfdPdf, NewRfdRevision, Rfd, RfdId, RfdPdf, RfdPdfId, RfdRevision, - RfdRevisionId, RfdRevisionMeta, + Job, NewJob, NewRfd, NewRfdPdf, NewRfdRevision, Rfd, RfdId, RfdMeta, RfdPdf, RfdPdfId, + RfdRevision, RfdRevisionId, RfdRevisionMeta, }; use super::{ - JobFilter, JobStore, ListPagination, RfdFilter, RfdPdfFilter, RfdPdfStore, RfdRevisionFilter, - RfdRevisionMetaStore, RfdRevisionStore, RfdStore, + JobFilter, JobStore, ListPagination, RfdFilter, RfdMetaStore, RfdPdfFilter, RfdPdfStore, + RfdRevisionFilter, RfdRevisionMetaStore, RfdRevisionStore, RfdStore, }; -pub type DbPool = Pool>; - -pub struct PostgresStore { - pool: DbPool, -} - -#[derive(Debug, Error)] -pub enum PostgresError { - #[error("Failed to connect to database")] - Connection(ConnectionError), -} - -impl From for PostgresError { - fn from(error: ConnectionError) -> Self { - PostgresError::Connection(error) - } -} - -impl PostgresStore { - pub async fn new(url: &str) -> Result { - let manager = ConnectionManager::::new(url); - - Ok(Self { - pool: Pool::builder() - .connection_timeout(Duration::from_secs(30)) - .build(manager) - .await?, - }) - } -} - #[async_trait] impl RfdStore for PostgresStore { - async fn get(&self, id: &TypedUuid, deleted: bool) -> Result, StoreError> { + async fn get( + &self, + id: &TypedUuid, + revision: Option>, + deleted: bool, + ) -> Result, StoreError> { let rfd = RfdStore::list( self, - RfdFilter::default().id(Some(vec![*id])).deleted(deleted), + vec![RfdFilter::default() + .id(Some(vec![*id])) + .revision(revision.map(|rev| vec![rev])) + .deleted(deleted)], &ListPagination::default().limit(1), ) .await?; @@ -78,46 +54,81 @@ impl RfdStore for PostgresStore { async fn list( &self, - filter: RfdFilter, + filters: Vec, pagination: &ListPagination, ) -> Result, StoreError> { - let mut query = rfd::dsl::rfd.into_boxed(); - - tracing::trace!(?filter, "Lookup RFDs"); - - let RfdFilter { - id, - rfd_number, - public, - deleted, - } = filter; - - if let Some(id) = id { - query = - query.filter(rfd::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid))); - } - - if let Some(rfd_number) = rfd_number { - query = query.filter(rfd::rfd_number.eq_any(rfd_number)); - } + let mut query = rfd::table + .inner_join(rfd_revision::table) + .distinct_on(rfd::id) + .into_boxed(); - if let Some(public) = public { - query = query.filter( - rfd::visibility.eq(public - .then(|| Visibility::Public) - .unwrap_or(Visibility::Private)), - ); - } + tracing::trace!(?filters, "Lookup RFDs"); - if !deleted { - query = query.filter(rfd::deleted_at.is_null()); + let filter_predicates = filters + .into_iter() + .map(|filter| { + let mut predicates: Vec>> = vec![]; + let RfdFilter { + id, + revision, + rfd_number, + commit, + public, + deleted, + } = filter; + + if let Some(id) = id { + predicates.push(Box::new( + rfd::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid)), + )); + } + + if let Some(revision) = revision { + predicates + .push(Box::new(rfd_revision::id.eq_any( + revision.into_iter().map(GenericUuid::into_untyped_uuid), + ))); + } + + if let Some(rfd_number) = rfd_number { + predicates.push(Box::new(rfd::rfd_number.eq_any(rfd_number))); + } + + if let Some(commit) = commit { + predicates.push(Box::new( + rfd_revision::commit_sha.eq_any(commit.into_iter().map(|sha| sha.0)), + )); + } + + if let Some(public) = public { + predicates.push(Box::new( + rfd::visibility.eq(public + .then(|| Visibility::Public) + .unwrap_or(Visibility::Private)), + )); + } + + if !deleted { + predicates.push(Box::new(rfd::deleted_at.is_null())); + predicates.push(Box::new(rfd_revision::deleted_at.is_null())); + } + + predicates + }) + .collect::>(); + + if let Some(predicate) = flatten_predicates(filter_predicates) { + query = query.filter(predicate); } let results = query .offset(pagination.offset) .limit(pagination.limit) - .order(rfd::rfd_number.desc()) - .get_results_async::(&*self.pool.get().await?) + .order(( + rfd_revision::rfd_id.asc(), + rfd_revision::committed_at.desc(), + )) + .get_results_async::<(RfdModel, RfdRevisionModel)>(&*self.pool.get().await?) .await?; tracing::trace!(count = ?results.len(), "Found RFDs"); @@ -126,7 +137,7 @@ impl RfdStore for PostgresStore { } async fn upsert(&self, new_rfd: NewRfd) -> Result { - let rfd: RfdModel = insert_into(rfd::dsl::rfd) + let _: RfdModel = insert_into(rfd::dsl::rfd) .values(( rfd::id.eq(new_rfd.id.into_untyped_uuid()), rfd::rfd_number.eq(new_rfd.rfd_number.clone()), @@ -144,7 +155,11 @@ impl RfdStore for PostgresStore { .get_result_async(&*self.pool.get().await?) .await?; - Ok(rfd.into()) + // There is a race condition here than case a failure where a delete occurs between + // the upsert and the get + Ok(RfdStore::get(self, &new_rfd.id, None, false) + .await? + .unwrap()) } async fn delete(&self, id: &TypedUuid) -> Result, StoreError> { @@ -154,136 +169,196 @@ impl RfdStore for PostgresStore { .execute_async(&*self.pool.get().await?) .await?; - RfdStore::get(self, id, true).await + RfdStore::get(self, id, None, true).await } } #[async_trait] -impl RfdRevisionStore for PostgresStore { +impl RfdMetaStore for PostgresStore { async fn get( &self, - id: &TypedUuid, + id: TypedUuid, + revision: Option>, deleted: bool, - ) -> Result, StoreError> { - let user = RfdRevisionStore::list( + ) -> Result, StoreError> { + let rfd = RfdMetaStore::list( self, - RfdRevisionFilter::default() - .id(Some(vec![*id])) - .deleted(deleted), + vec![RfdFilter::default() + .id(Some(vec![id])) + .revision(revision.map(|rev| vec![rev])) + .deleted(deleted)], &ListPagination::default().limit(1), ) .await?; - Ok(user.into_iter().nth(0)) + Ok(rfd.into_iter().nth(0)) } async fn list( &self, - filter: RfdRevisionFilter, + filters: Vec, pagination: &ListPagination, - ) -> Result, StoreError> { - let mut query = rfd_revision::dsl::rfd_revision.into_boxed(); - - tracing::trace!(?filter, "Lookup RFD revisions"); - - let RfdRevisionFilter { - id, - rfd, - sha, - deleted, - } = filter; - - if let Some(id) = id { - query = query.filter( - rfd_revision::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid)), - ); - } - - if let Some(rfd) = rfd { - query = query.filter( - rfd_revision::rfd_id.eq_any(rfd.into_iter().map(GenericUuid::into_untyped_uuid)), - ); - } + ) -> Result, StoreError> { + let mut query = rfd::table + .inner_join(rfd_revision::table) + .distinct_on(rfd::id) + .select((RfdModel::as_select(), RfdRevisionMetaModel::as_select())) + .into_boxed(); - if let Some(sha) = sha { - query = query.filter(rfd_revision::sha.eq_any(sha)); - } + tracing::trace!(?filters, "Lookup RFDs"); - if !deleted { - query = query.filter(rfd_revision::deleted_at.is_null()); + let filter_predicates = filters + .into_iter() + .map(|filter| { + let mut predicates: Vec>> = vec![]; + let RfdFilter { + id, + revision, + rfd_number, + commit, + public, + deleted, + } = filter; + + if let Some(id) = id { + predicates.push(Box::new( + rfd::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid)), + )); + } + + if let Some(revision) = revision { + predicates + .push(Box::new(rfd_revision::id.eq_any( + revision.into_iter().map(GenericUuid::into_untyped_uuid), + ))); + } + + if let Some(rfd_number) = rfd_number { + predicates.push(Box::new(rfd::rfd_number.eq_any(rfd_number))); + } + + if let Some(commit) = commit { + predicates.push(Box::new( + rfd_revision::commit_sha.eq_any(commit.into_iter().map(|sha| sha.0)), + )); + } + + if let Some(public) = public { + predicates.push(Box::new( + rfd::visibility.eq(public + .then(|| Visibility::Public) + .unwrap_or(Visibility::Private)), + )); + } + + if !deleted { + predicates.push(Box::new(rfd::deleted_at.is_null())); + predicates.push(Box::new(rfd_revision::deleted_at.is_null())); + } + + predicates + }) + .collect::>(); + + if let Some(predicate) = flatten_predicates(filter_predicates) { + query = query.filter(predicate); } - let query = query + let results = query .offset(pagination.offset) .limit(pagination.limit) - .order(rfd_revision::committed_at.desc()); + .order(( + rfd_revision::rfd_id.asc(), + rfd_revision::committed_at.desc(), + )) + .get_results_async::<(RfdModel, RfdRevisionMetaModel)>(&*self.pool.get().await?) + .await?; - tracing::info!(query = ?debug_query(&query), "Run list rfds"); + tracing::trace!(count = ?results.len(), "Found RFDs"); - let results = query - .get_results_async::(&*self.pool.get().await?) - .await?; + Ok(results.into_iter().map(|rfd| rfd.into()).collect()) + } +} - Ok(results - .into_iter() - .map(|revision| revision.into()) - .collect()) +#[async_trait] +impl RfdRevisionStore for PostgresStore { + async fn get( + &self, + id: &TypedUuid, + deleted: bool, + ) -> Result, StoreError> { + let user = RfdRevisionStore::list( + self, + vec![RfdRevisionFilter::default() + .id(Some(vec![*id])) + .deleted(deleted)], + &ListPagination::default().limit(1), + ) + .await?; + Ok(user.into_iter().nth(0)) } - // TODO: Refactor into a group by arg in list. Diesel types here are a pain - async fn list_unique_rfd( + async fn list( &self, - filter: RfdRevisionFilter, + filters: Vec, pagination: &ListPagination, ) -> Result, StoreError> { - let mut query = rfd_revision::dsl::rfd_revision - .distinct_on(rfd_revision::rfd_id) - .into_boxed(); - - tracing::trace!(rfd_ids = ?filter.rfd.as_ref().map(|list| list.len()), "Lookup unique RFD revisions"); - - let RfdRevisionFilter { - id, - rfd, - sha, - deleted, - } = filter; - - if let Some(id) = id { - query = query.filter( - rfd_revision::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid)), - ); - } + let mut query = rfd_revision::dsl::rfd_revision.into_boxed(); - if let Some(rfd) = rfd { - query = query.filter( - rfd_revision::rfd_id.eq_any(rfd.into_iter().map(GenericUuid::into_untyped_uuid)), - ); - } + tracing::trace!(?filters, "Lookup RFD revisions"); - if let Some(sha) = sha { - query = query.filter(rfd_revision::sha.eq_any(sha)); - } - - if !deleted { - query = query.filter(rfd_revision::deleted_at.is_null()); + let filter_predicates = filters + .into_iter() + .map(|filter| { + let mut predicates: Vec>> = vec![]; + let RfdRevisionFilter { + id, + rfd, + commit, + deleted, + } = filter; + + if let Some(id) = id { + predicates.push(Box::new( + rfd_revision::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid)), + )); + } + + if let Some(rfd) = rfd { + predicates.push(Box::new( + rfd_revision::rfd_id + .eq_any(rfd.into_iter().map(GenericUuid::into_untyped_uuid)), + )); + } + + if let Some(commit) = commit { + predicates.push(Box::new( + rfd_revision::commit_sha.eq_any(commit.into_iter().map(|sha| sha.0)), + )); + } + + if !deleted { + predicates.push(Box::new(rfd_revision::deleted_at.is_null())); + } + + predicates + }) + .collect::>(); + + if let Some(predicate) = flatten_predicates(filter_predicates) { + query = query.filter(predicate); } let query = query .offset(pagination.offset) .limit(pagination.limit) - .order(( - rfd_revision::rfd_id.asc(), - rfd_revision::committed_at.desc(), - )); + .order(rfd_revision::committed_at.desc()); - tracing::info!(query = ?debug_query(&query), "Run list unique rfds"); + tracing::info!(query = ?debug_query(&query), "Run list rfds"); let results = query .get_results_async::(&*self.pool.get().await?) .await?; - tracing::trace!(count = ?results.len(), "Found unique RFD revisions"); - Ok(results .into_iter() .map(|revision| revision.into()) @@ -351,9 +426,9 @@ impl RfdRevisionMetaStore for PostgresStore { ) -> Result, StoreError> { let user = RfdRevisionMetaStore::list( self, - RfdRevisionFilter::default() + vec![RfdRevisionFilter::default() .id(Some(vec![*id])) - .deleted(deleted), + .deleted(deleted)], &ListPagination::default().limit(1), ) .await?; @@ -362,7 +437,7 @@ impl RfdRevisionMetaStore for PostgresStore { async fn list( &self, - filter: RfdRevisionFilter, + filters: Vec, pagination: &ListPagination, ) -> Result, StoreError> { let mut query = rfd_revision::dsl::rfd_revision @@ -384,33 +459,48 @@ impl RfdRevisionMetaStore for PostgresStore { )) .into_boxed(); - tracing::trace!(?filter, "Lookup RFD revision metadata"); - - let RfdRevisionFilter { - id, - rfd, - sha, - deleted, - } = filter; - - if let Some(id) = id { - query = query.filter( - rfd_revision::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid)), - ); - } - - if let Some(rfd) = rfd { - query = query.filter( - rfd_revision::rfd_id.eq_any(rfd.into_iter().map(GenericUuid::into_untyped_uuid)), - ); - } - - if let Some(sha) = sha { - query = query.filter(rfd_revision::sha.eq_any(sha)); - } + tracing::trace!(?filters, "Lookup RFD revision metadata"); - if !deleted { - query = query.filter(rfd_revision::deleted_at.is_null()); + let filter_predicates = filters + .into_iter() + .map(|filter| { + let mut predicates: Vec>> = vec![]; + let RfdRevisionFilter { + id, + rfd, + commit, + deleted, + } = filter; + + if let Some(id) = id { + predicates.push(Box::new( + rfd_revision::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid)), + )); + } + + if let Some(rfd) = rfd { + predicates.push(Box::new( + rfd_revision::rfd_id + .eq_any(rfd.into_iter().map(GenericUuid::into_untyped_uuid)), + )); + } + + if let Some(commit) = commit { + predicates.push(Box::new( + rfd_revision::commit_sha.eq_any(commit.into_iter().map(|sha| sha.0)), + )); + } + + if !deleted { + predicates.push(Box::new(rfd_revision::deleted_at.is_null())); + } + + predicates + }) + .collect::>(); + + if let Some(predicate) = flatten_predicates(filter_predicates) { + query = query.filter(predicate); } let query = query @@ -431,83 +521,6 @@ impl RfdRevisionMetaStore for PostgresStore { .map(|revision| revision.into()) .collect()) } - - // TODO: Refactor into a group by arg in list. Diesel types here are a pain - async fn list_unique_rfd( - &self, - filter: RfdRevisionFilter, - pagination: &ListPagination, - ) -> Result, StoreError> { - let mut query = rfd_revision::dsl::rfd_revision - .select(( - rfd_revision::id, - rfd_revision::rfd_id, - rfd_revision::title, - rfd_revision::state, - rfd_revision::discussion, - rfd_revision::authors, - rfd_revision::content_format, - rfd_revision::sha, - rfd_revision::commit_sha, - rfd_revision::committed_at, - rfd_revision::created_at, - rfd_revision::updated_at, - rfd_revision::deleted_at, - rfd_revision::labels, - )) - .distinct_on(rfd_revision::rfd_id) - .into_boxed(); - - tracing::trace!(rfd_ids = ?filter.rfd.as_ref().map(|list| list.len()), "Lookup unique RFD revision metadata"); - - let RfdRevisionFilter { - id, - rfd, - sha, - deleted, - } = filter; - - if let Some(id) = id { - query = query.filter( - rfd_revision::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid)), - ); - } - - if let Some(rfd) = rfd { - query = query.filter( - rfd_revision::rfd_id.eq_any(rfd.into_iter().map(GenericUuid::into_untyped_uuid)), - ); - } - - if let Some(sha) = sha { - query = query.filter(rfd_revision::sha.eq_any(sha)); - } - - if !deleted { - query = query.filter(rfd_revision::deleted_at.is_null()); - } - - let query = query - .offset(pagination.offset) - .limit(pagination.limit) - .order(( - rfd_revision::rfd_id.asc(), - rfd_revision::committed_at.desc(), - )); - - tracing::info!(query = ?debug_query(&query), "Run list unique rfd metadata"); - - let results = query - .get_results_async::(&*self.pool.get().await?) - .await?; - - tracing::trace!(count = ?results.len(), "Found unique RFD revision metadata"); - - Ok(results - .into_iter() - .map(|revision| revision.into()) - .collect()) - } } #[async_trait] @@ -519,7 +532,7 @@ impl RfdPdfStore for PostgresStore { ) -> Result, StoreError> { let user = RfdPdfStore::list( self, - RfdPdfFilter::default().id(Some(vec![*id])).deleted(deleted), + vec![RfdPdfFilter::default().id(Some(vec![*id])).deleted(deleted)], &ListPagination::default().limit(1), ) .await?; @@ -528,50 +541,63 @@ impl RfdPdfStore for PostgresStore { async fn list( &self, - filter: super::RfdPdfFilter, + filters: Vec, pagination: &ListPagination, ) -> Result, StoreError> { let mut query = rfd_pdf::dsl::rfd_pdf.into_boxed(); - tracing::trace!(?filter, "Lookup RFD pdfs"); - - let RfdPdfFilter { - id, - rfd_revision, - source, - deleted, - rfd, - external_id, - } = filter; - - if let Some(id) = id { - query = query - .filter(rfd_pdf::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid))); - } - - if let Some(rfd_revision) = rfd_revision { - query = query.filter( - rfd_pdf::rfd_revision_id - .eq_any(rfd_revision.into_iter().map(GenericUuid::into_untyped_uuid)), - ); - } - - if let Some(source) = source { - query = query.filter(rfd_pdf::source.eq_any(source)); - } - - if let Some(rfd) = rfd { - query = query.filter( - rfd_pdf::rfd_id.eq_any(rfd.into_iter().map(GenericUuid::into_untyped_uuid)), - ); - } - - if let Some(external_id) = external_id { - query = query.filter(rfd_pdf::external_id.eq_any(external_id)); - } + tracing::trace!(?filters, "Lookup RFD pdfs"); - if !deleted { - query = query.filter(rfd_pdf::deleted_at.is_null()); + let filter_predicates = filters + .into_iter() + .map(|filter| { + let mut predicates: Vec>> = vec![]; + let RfdPdfFilter { + id, + rfd_revision, + source, + deleted, + rfd, + external_id, + } = filter; + + if let Some(id) = id { + predicates.push(Box::new( + rfd_pdf::id.eq_any(id.into_iter().map(GenericUuid::into_untyped_uuid)), + )); + } + + if let Some(rfd_revision) = rfd_revision { + predicates + .push(Box::new(rfd_pdf::rfd_revision_id.eq_any( + rfd_revision.into_iter().map(GenericUuid::into_untyped_uuid), + ))); + } + + if let Some(source) = source { + predicates.push(Box::new(rfd_pdf::source.eq_any(source))); + } + + if let Some(rfd) = rfd { + predicates.push(Box::new( + rfd_pdf::rfd_id.eq_any(rfd.into_iter().map(GenericUuid::into_untyped_uuid)), + )); + } + + if let Some(external_id) = external_id { + predicates.push(Box::new(rfd_pdf::external_id.eq_any(external_id))); + } + + if !deleted { + predicates.push(Box::new(rfd_pdf::deleted_at.is_null())); + } + + predicates + }) + .collect::>(); + + if let Some(predicate) = flatten_predicates(filter_predicates) { + query = query.filter(predicate); } let results = query @@ -621,7 +647,7 @@ impl JobStore for PostgresStore { async fn get(&self, id: i32) -> Result, StoreError> { let user = JobStore::list( self, - JobFilter::default().id(Some(vec![id])), + vec![JobFilter::default().id(Some(vec![id]))], &ListPagination::default().limit(1), ) .await?; @@ -630,36 +656,47 @@ impl JobStore for PostgresStore { async fn list( &self, - filter: super::JobFilter, + filters: Vec, pagination: &ListPagination, ) -> Result, StoreError> { let mut query = job::dsl::job.into_boxed(); - - let JobFilter { - id, - sha, - processed, - started, - } = filter; - - if let Some(id) = id { - query = query.filter(job::id.eq_any(id)); - } - - if let Some(sha) = sha { - query = query.filter(job::sha.eq_any(sha)); - } - - if let Some(processed) = processed { - query = query.filter(job::processed.eq(processed)); - } - - if let Some(started) = started { - if started { - query = query.filter(job::started_at.is_not_null()); - } else { - query = query.filter(job::started_at.is_null()); - } + let filter_predicates = filters + .into_iter() + .map(|filter| { + let mut predicates: Vec>> = vec![]; + let JobFilter { + id, + sha, + processed, + started, + } = filter; + + if let Some(id) = id { + predicates.push(Box::new(job::id.eq_any(id))); + } + + if let Some(sha) = sha { + predicates.push(Box::new(job::sha.eq_any(sha))); + } + + if let Some(processed) = processed { + predicates.push(Box::new(job::processed.eq(processed))); + } + + if let Some(started) = started { + if started { + predicates.push(Box::new(job::started_at.is_not_null())); + } else { + predicates.push(Box::new(job::started_at.is_null())); + } + } + + predicates + }) + .collect::>(); + + if let Some(predicate) = flatten_predicates(filter_predicates) { + query = query.filter(predicate); } let results = query @@ -714,3 +751,25 @@ impl JobStore for PostgresStore { JobStore::get(self, id).await } } + +fn flatten_predicates( + predicates: Vec>>>, +) -> Option>> +where + T: 'static, +{ + let mut filter_predicates = vec![]; + + for p in predicates { + let flat = p + .into_iter() + .reduce(|combined, entry| Box::new(combined.and(entry))); + if let Some(flat) = flat { + filter_predicates.push(flat); + } + } + + filter_predicates + .into_iter() + .reduce(|combined, entry| Box::new(combined.or(entry))) +} diff --git a/rfd-processor/src/context.rs b/rfd-processor/src/context.rs index 1d32fd9d..3e6a6dd0 100644 --- a/rfd-processor/src/context.rs +++ b/rfd-processor/src/context.rs @@ -19,7 +19,7 @@ use octorust::{ }; use reqwest::Error as ReqwestError; use rfd_github::{GitHubError, GitHubRfdRepo}; -use rfd_model::{schema_ext::PdfSource, storage::postgres::PostgresStore}; +use rfd_model::schema_ext::PdfSource; use rsa::{ pkcs1::{DecodeRsaPrivateKey, EncodeRsaPrivateKey}, RsaPrivateKey, @@ -27,6 +27,7 @@ use rsa::{ use tap::TapFallible; use thiserror::Error; use tracing::instrument; +use v_model::storage::postgres::PostgresStore; use crate::{ pdf::{PdfFileLocation, PdfStorage, RfdPdf, RfdPdfError}, diff --git a/rfd-processor/src/processor.rs b/rfd-processor/src/processor.rs index d73814dc..b0d8174d 100644 --- a/rfd-processor/src/processor.rs +++ b/rfd-processor/src/processor.rs @@ -34,9 +34,9 @@ pub async fn processor(ctx: Arc) -> Result<(), JobError> { if ctx.processor.enabled { let jobs = JobStore::list( &ctx.db.storage, - JobFilter::default() + vec![JobFilter::default() .processed(Some(false)) - .started(Some(false)), + .started(Some(false))], &pagination, ) .await?; diff --git a/rfd-processor/src/rfd.rs b/rfd-processor/src/rfd.rs index 5cb4bf5c..e6fe0b12 100644 --- a/rfd-processor/src/rfd.rs +++ b/rfd-processor/src/rfd.rs @@ -56,7 +56,7 @@ impl PersistedRfd { { let existing_rfd = RfdStore::list( storage, - RfdFilter::default().rfd_number(Some(vec![number.into()])), + vec![RfdFilter::default().rfd_number(Some(vec![number.into()]))], &ListPagination::latest(), ) .await? @@ -66,7 +66,7 @@ impl PersistedRfd { if let Some(rfd) = existing_rfd { let most_recent_revision = RfdRevisionStore::list( storage, - RfdRevisionFilter::default().rfd(Some(vec![rfd.id])), + vec![RfdRevisionFilter::default().rfd(Some(vec![rfd.id]))], &ListPagination::latest(), ) .await? @@ -75,7 +75,7 @@ impl PersistedRfd { let most_recent_pdf = RfdPdfStore::list( storage, - RfdPdfFilter::default().rfd(Some(vec![rfd.id])), + vec![RfdPdfFilter::default().rfd(Some(vec![rfd.id]))], &ListPagination::latest(), ) .await? @@ -323,7 +323,7 @@ impl RemoteRfd { let (id, visibility) = RfdStore::list( storage, - RfdFilter::default().rfd_number(Some(vec![payload.number.into()])), + vec![RfdFilter::default().rfd_number(Some(vec![payload.number.into()]))], &ListPagination::latest(), ) .await? @@ -345,9 +345,9 @@ impl RemoteRfd { let id = RfdRevisionStore::list( storage, - RfdRevisionFilter::default() + vec![RfdRevisionFilter::default() .rfd(Some(vec![rfd.id])) - .sha(Some(vec![payload.commit_sha.clone().into()])), + .commit(Some(vec![payload.commit_sha.clone()]))], &ListPagination::latest(), ) .await? @@ -387,7 +387,7 @@ impl RemoteRfd { content: payload.content.raw().to_string(), content_format: payload.content_format, sha: payload.sha, - commit: payload.commit_sha.into(), + commit: payload.commit_sha, committed_at: payload.commit_date, }, ) @@ -395,7 +395,7 @@ impl RemoteRfd { let mut existing_pdf = RfdPdfStore::list( storage, - RfdPdfFilter::default().rfd(Some(vec![rfd.id])), + vec![RfdPdfFilter::default().rfd(Some(vec![rfd.id]))], &ListPagination::latest(), ) .await?;