From ff9a2075dfd6b02a0c8f38062d39e77142937efc Mon Sep 17 00:00:00 2001 From: Nander Stabel Date: Mon, 10 Jun 2024 14:53:59 +0200 Subject: [PATCH] feat: use polling strategy for JIT credentials (#71) * feat: add `offer_id` to Offer events * feat: add `offer_id` to all Offer events * feat: add polling strategy for JIT credentials * feat: add `POLLING_INTERVAL_MS` const * fix: fix failing tests due to blocking thread * fix: undo temporary code changes * fix: increase `sleep` duration to 100ms * docs: add TODO comment --- .env.example | 1 + agent_api_rest/Cargo.toml | 2 +- .../issuance/credential_issuer/credential.rs | 83 ++++++++++++++----- .../verification/relying_party/redirect.rs | 5 +- agent_application/docker/README.md | 3 + agent_event_publisher_http/Cargo.toml | 1 + agent_event_publisher_http/src/lib.rs | 17 ++-- 7 files changed, 82 insertions(+), 30 deletions(-) diff --git a/.env.example b/.env.example index 3d25ad7a..52e78bae 100644 --- a/.env.example +++ b/.env.example @@ -13,3 +13,4 @@ AGENT_CONFIG_DEFAULT_DID_METHOD="did:key" AGENT_STORE_DB_CONNECTION_STRING=postgresql://demo_user:demo_pass@localhost:5432/demo AGENT_CONFIG_DISPLAY_NAME="UniCore" AGENT_CONFIG_DISPLAY_LOGO_URI="http://example.com/logo.png" +AGENT_API_REST_EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS=500 diff --git a/agent_api_rest/Cargo.toml b/agent_api_rest/Cargo.toml index 088f0906..9f219681 100644 --- a/agent_api_rest/Cargo.toml +++ b/agent_api_rest/Cargo.toml @@ -20,6 +20,7 @@ oid4vp.workspace = true serde.workspace = true serde_json.workspace = true siopv2.workspace = true +tokio.workspace = true tower-http.workspace = true tracing.workspace = true tracing-subscriber.workspace = true @@ -41,7 +42,6 @@ rstest.workspace = true serde_urlencoded = "0.7" serde_yaml.workspace = true serial_test = "3.0" -tokio.workspace = true tower = { version = "0.4" } tracing-test.workspace = true url.workspace = true diff --git a/agent_api_rest/src/issuance/credential_issuer/credential.rs b/agent_api_rest/src/issuance/credential_issuer/credential.rs index 1cf06d89..5d69b76e 100644 --- a/agent_api_rest/src/issuance/credential_issuer/credential.rs +++ b/agent_api_rest/src/issuance/credential_issuer/credential.rs @@ -1,3 +1,5 @@ +use std::time::{Duration, Instant}; + use agent_issuance::{ credential::{command::CredentialCommand, queries::CredentialView}, offer::{ @@ -7,7 +9,10 @@ use agent_issuance::{ server_config::queries::ServerConfigView, state::{IssuanceState, SERVER_CONFIG_ID}, }; -use agent_shared::handlers::{command_handler, query_handler}; +use agent_shared::{ + config, + handlers::{command_handler, query_handler}, +}; use axum::{ extract::{Json, State}, http::StatusCode, @@ -16,9 +21,11 @@ use axum::{ use axum_auth::AuthBearer; use oid4vci::credential_request::CredentialRequest; use serde_json::json; -use tracing::info; +use tokio::time::sleep; +use tracing::{error, info}; -const EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS: u64 = 250; +const DEFAULT_EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS: u128 = 1000; +const POLLING_INTERVAL_MS: u64 = 100; #[axum_macros::debug_handler] pub(crate) async fn credential( @@ -57,18 +64,34 @@ pub(crate) async fn credential( StatusCode::INTERNAL_SERVER_ERROR.into_response(); }; - // This ensures that the server waits for a sufficient duration to potentially receive a credential from an external - // server. - std::thread::sleep(std::time::Duration::from_millis(EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS)); + let timeout = config!("external_server_response_timeout_ms") + .ok() + .and_then(|external_server_response_timeout_ms| external_server_response_timeout_ms.parse().ok()) + .unwrap_or(DEFAULT_EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS); + let start_time = Instant::now(); + // TODO: replace this polling solution with a call to the `TxChannelRegistry` as described here: https://github.com/impierce/ssi-agent/issues/75 // Use the `offer_id` to get the `credential_ids` and `subject_id` from the `OfferView`. - let (credential_ids, subject_id) = match query_handler(&offer_id, &state.query.offer).await { - Ok(Some(OfferView { - credential_ids, - subject_id: Some(subject_id), - .. - })) => (credential_ids, subject_id), - _ => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), + let (credential_ids, subject_id) = loop { + match query_handler(&offer_id, &state.query.offer).await { + // When the Offer does not include the credential id's yet, wait for the external server to provide them. + Ok(Some(OfferView { credential_ids, .. })) if credential_ids.is_empty() => { + if start_time.elapsed().as_millis() <= timeout { + sleep(Duration::from_millis(POLLING_INTERVAL_MS)).await; + } else { + error!("timeout failure"); + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + } + Ok(Some(OfferView { + credential_ids, + subject_id: Some(subject_id), + .. + })) => break (credential_ids, subject_id), + _ => { + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + } }; // Use the `credential_ids` and `subject_id` to sign all the credentials. @@ -169,14 +192,24 @@ mod tests { 6jZ4IwAtEwt6XfbV9luFalRL3qtsmDvaNBf7CA"; trait CredentialEventTrigger { - async fn prepare_credential_event_trigger(&self, app: Arc>>, is_self_signed: bool); + async fn prepare_credential_event_trigger( + &self, + app: Arc>>, + is_self_signed: bool, + delay: u128, + ); } // Adds a method to `MockServer` which can be used to mount a mock endpoint that will be triggered when a // `CredentialRequestVerified` event is dispatched from the `UniCore` server. The `MockServer` used in this test // module must be seen as a representation of an outside backend server. impl CredentialEventTrigger for MockServer { - async fn prepare_credential_event_trigger(&self, app: Arc>>, is_self_signed: bool) { + async fn prepare_credential_event_trigger( + &self, + app: Arc>>, + is_self_signed: bool, + delay: u128, + ) { Mock::given(method("POST")) .and(path("/ssi-events-subscriber")) .and( @@ -214,6 +247,8 @@ mod tests { } }; + std::thread::sleep(Duration::from_millis(delay.try_into().unwrap())); + // Sends the `CredentialsRequest` to the `credentials` endpoint. app_clone .oneshot( @@ -242,13 +277,19 @@ mod tests { } #[rstest] - #[case::without_external_server(false, false)] - #[case::with_external_server(true, false)] - #[case::with_external_server_and_self_signed_credential(true, true)] + #[case::without_external_server(false, false, 0)] + #[case::with_external_server(true, false, 0)] + #[case::with_external_server_and_self_signed_credential(true, true, 0)] + #[should_panic(expected = "assertion `left == right` failed\n left: 500\n right: 200")] + #[case::should_panic_due_to_timout(true, false, DEFAULT_EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS + 100)] #[serial_test::serial] - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] #[tracing_test::traced_test] - async fn test_credential_endpoint(#[case] with_external_server: bool, #[case] is_self_signed: bool) { + async fn test_credential_endpoint( + #[case] with_external_server: bool, + #[case] is_self_signed: bool, + #[case] delay: u128, + ) { use crate::issuance::credentials::tests::credentials; let (external_server, issuance_event_publishers, verification_event_publishers) = if with_external_server { @@ -293,7 +334,7 @@ mod tests { if let Some(external_server) = &external_server { external_server - .prepare_credential_event_trigger(Arc::new(Mutex::new(Some(app.clone()))), is_self_signed) + .prepare_credential_event_trigger(Arc::new(Mutex::new(Some(app.clone()))), is_self_signed, delay) .await; } diff --git a/agent_api_rest/src/verification/relying_party/redirect.rs b/agent_api_rest/src/verification/relying_party/redirect.rs index 7add9118..17e17208 100644 --- a/agent_api_rest/src/verification/relying_party/redirect.rs +++ b/agent_api_rest/src/verification/relying_party/redirect.rs @@ -135,7 +135,7 @@ pub mod tests { assert_eq!(response.status(), StatusCode::OK); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] #[tracing_test::traced_test] async fn test_redirect_endpoint() { let mock_server = MockServer::start().await; @@ -187,6 +187,9 @@ pub mod tests { request(&mut app, state.clone()).await; redirect(&mut app, state).await; + // Wait for the request to arrive at the mock server endpoint. + std::thread::sleep(std::time::Duration::from_millis(100)); + // Assert that the event was dispatched to the target URL. assert!(mock_server.received_requests().await.unwrap().len() == 1); } diff --git a/agent_application/docker/README.md b/agent_application/docker/README.md index 9d97126a..95f144b6 100644 --- a/agent_application/docker/README.md +++ b/agent_application/docker/README.md @@ -91,3 +91,6 @@ To integrate just-in-time data request events into your workflow, adhere to the ``` 2. Upon initiation of the OpenID4VCI flow by a Wallet, the CredentialRequestVerified event is triggered, containing relevant identifiers. 3. The HTTP Event Publisher dispatches the event to the external system. Leveraging the provided identifiers, the external system generates and signs the credential, then submits it to UniCore's `/v1/credentials` endpoint. Refer to the [API specification](../../agent_api_rest/README.md)) for additional details on endpoint usage. + +By default, UniCore will wait up to 1000 ms for the signed credential to arrive. This parameter can be changed by +setting the `AGENT_API_REST_EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS` environment variable. diff --git a/agent_event_publisher_http/Cargo.toml b/agent_event_publisher_http/Cargo.toml index af4aa7d4..4bd9aa40 100644 --- a/agent_event_publisher_http/Cargo.toml +++ b/agent_event_publisher_http/Cargo.toml @@ -21,6 +21,7 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus serde.workspace = true serde_with.workspace = true serde_yaml.workspace = true +tokio.workspace = true [dev-dependencies] agent_event_publisher_http = { path = ".", features = ["test"] } diff --git a/agent_event_publisher_http/src/lib.rs b/agent_event_publisher_http/src/lib.rs index 0bda6fe5..a04476c1 100644 --- a/agent_event_publisher_http/src/lib.rs +++ b/agent_event_publisher_http/src/lib.rs @@ -121,12 +121,12 @@ where async fn dispatch(&self, _view_id: &str, events: &[EventEnvelope]) { for event in events { if self.target_events.contains(&event.payload.event_type()) { - self.client - .post(&self.target_url) - .json(&event.payload) - .send() - .await - .ok(); + let request = self.client.post(&self.target_url).json(&event.payload); + + // Send the request in a separate thread so that we don't have to await the response in the current thread. + tokio::task::spawn(async move { + request.send().await.ok(); + }); } } } @@ -140,7 +140,7 @@ mod tests { use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn it_works() { let mock_server = MockServer::start().await; @@ -187,6 +187,9 @@ mod tests { // Dispatch the event. publisher.offer.as_ref().unwrap().dispatch("view_id", &events).await; + // Wait for the request to arrive at the mock server endpoint. + std::thread::sleep(std::time::Duration::from_millis(100)); + // Assert that the event was dispatched to the target URL. assert_eq!( offer_event,