Skip to content

Commit

Permalink
feat: use polling strategy for JIT credentials (#71)
Browse files Browse the repository at this point in the history
* feat: add `offer_id` to Offer events

* feat: add `offer_id` to all Offer events

* feat: add polling strategy for JIT credentials

* feat: add `POLLING_INTERVAL_MS` const

* fix: fix failing tests due to blocking thread

* fix: undo temporary code changes

* fix: increase `sleep` duration to 100ms

* docs: add TODO comment
  • Loading branch information
nanderstabel authored Jun 10, 2024
1 parent 12437f4 commit ff9a207
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 30 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ AGENT_CONFIG_DEFAULT_DID_METHOD="did:key"
AGENT_STORE_DB_CONNECTION_STRING=postgresql://demo_user:demo_pass@localhost:5432/demo
AGENT_CONFIG_DISPLAY_NAME="UniCore"
AGENT_CONFIG_DISPLAY_LOGO_URI="http://example.com/logo.png"
AGENT_API_REST_EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS=500
2 changes: 1 addition & 1 deletion agent_api_rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ oid4vp.workspace = true
serde.workspace = true
serde_json.workspace = true
siopv2.workspace = true
tokio.workspace = true
tower-http.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
Expand All @@ -41,7 +42,6 @@ rstest.workspace = true
serde_urlencoded = "0.7"
serde_yaml.workspace = true
serial_test = "3.0"
tokio.workspace = true
tower = { version = "0.4" }
tracing-test.workspace = true
url.workspace = true
Expand Down
83 changes: 62 additions & 21 deletions agent_api_rest/src/issuance/credential_issuer/credential.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::{Duration, Instant};

use agent_issuance::{
credential::{command::CredentialCommand, queries::CredentialView},
offer::{
Expand All @@ -7,7 +9,10 @@ use agent_issuance::{
server_config::queries::ServerConfigView,
state::{IssuanceState, SERVER_CONFIG_ID},
};
use agent_shared::handlers::{command_handler, query_handler};
use agent_shared::{
config,
handlers::{command_handler, query_handler},
};
use axum::{
extract::{Json, State},
http::StatusCode,
Expand All @@ -16,9 +21,11 @@ use axum::{
use axum_auth::AuthBearer;
use oid4vci::credential_request::CredentialRequest;
use serde_json::json;
use tracing::info;
use tokio::time::sleep;
use tracing::{error, info};

const EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS: u64 = 250;
const DEFAULT_EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS: u128 = 1000;
const POLLING_INTERVAL_MS: u64 = 100;

#[axum_macros::debug_handler]
pub(crate) async fn credential(
Expand Down Expand Up @@ -57,18 +64,34 @@ pub(crate) async fn credential(
StatusCode::INTERNAL_SERVER_ERROR.into_response();
};

// This ensures that the server waits for a sufficient duration to potentially receive a credential from an external
// server.
std::thread::sleep(std::time::Duration::from_millis(EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS));
let timeout = config!("external_server_response_timeout_ms")
.ok()
.and_then(|external_server_response_timeout_ms| external_server_response_timeout_ms.parse().ok())
.unwrap_or(DEFAULT_EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS);
let start_time = Instant::now();

// TODO: replace this polling solution with a call to the `TxChannelRegistry` as described here: https://github.com/impierce/ssi-agent/issues/75
// Use the `offer_id` to get the `credential_ids` and `subject_id` from the `OfferView`.
let (credential_ids, subject_id) = match query_handler(&offer_id, &state.query.offer).await {
Ok(Some(OfferView {
credential_ids,
subject_id: Some(subject_id),
..
})) => (credential_ids, subject_id),
_ => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
let (credential_ids, subject_id) = loop {
match query_handler(&offer_id, &state.query.offer).await {
// When the Offer does not include the credential id's yet, wait for the external server to provide them.
Ok(Some(OfferView { credential_ids, .. })) if credential_ids.is_empty() => {
if start_time.elapsed().as_millis() <= timeout {
sleep(Duration::from_millis(POLLING_INTERVAL_MS)).await;
} else {
error!("timeout failure");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
}
Ok(Some(OfferView {
credential_ids,
subject_id: Some(subject_id),
..
})) => break (credential_ids, subject_id),
_ => {
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
}
};

// Use the `credential_ids` and `subject_id` to sign all the credentials.
Expand Down Expand Up @@ -169,14 +192,24 @@ mod tests {
6jZ4IwAtEwt6XfbV9luFalRL3qtsmDvaNBf7CA";

trait CredentialEventTrigger {
async fn prepare_credential_event_trigger(&self, app: Arc<Mutex<Option<Router>>>, is_self_signed: bool);
async fn prepare_credential_event_trigger(
&self,
app: Arc<Mutex<Option<Router>>>,
is_self_signed: bool,
delay: u128,
);
}

// Adds a method to `MockServer` which can be used to mount a mock endpoint that will be triggered when a
// `CredentialRequestVerified` event is dispatched from the `UniCore` server. The `MockServer` used in this test
// module must be seen as a representation of an outside backend server.
impl CredentialEventTrigger for MockServer {
async fn prepare_credential_event_trigger(&self, app: Arc<Mutex<Option<Router>>>, is_self_signed: bool) {
async fn prepare_credential_event_trigger(
&self,
app: Arc<Mutex<Option<Router>>>,
is_self_signed: bool,
delay: u128,
) {
Mock::given(method("POST"))
.and(path("/ssi-events-subscriber"))
.and(
Expand Down Expand Up @@ -214,6 +247,8 @@ mod tests {
}
};

std::thread::sleep(Duration::from_millis(delay.try_into().unwrap()));

// Sends the `CredentialsRequest` to the `credentials` endpoint.
app_clone
.oneshot(
Expand Down Expand Up @@ -242,13 +277,19 @@ mod tests {
}

#[rstest]
#[case::without_external_server(false, false)]
#[case::with_external_server(true, false)]
#[case::with_external_server_and_self_signed_credential(true, true)]
#[case::without_external_server(false, false, 0)]
#[case::with_external_server(true, false, 0)]
#[case::with_external_server_and_self_signed_credential(true, true, 0)]
#[should_panic(expected = "assertion `left == right` failed\n left: 500\n right: 200")]
#[case::should_panic_due_to_timout(true, false, DEFAULT_EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS + 100)]
#[serial_test::serial]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
#[tracing_test::traced_test]
async fn test_credential_endpoint(#[case] with_external_server: bool, #[case] is_self_signed: bool) {
async fn test_credential_endpoint(
#[case] with_external_server: bool,
#[case] is_self_signed: bool,
#[case] delay: u128,
) {
use crate::issuance::credentials::tests::credentials;

let (external_server, issuance_event_publishers, verification_event_publishers) = if with_external_server {
Expand Down Expand Up @@ -293,7 +334,7 @@ mod tests {

if let Some(external_server) = &external_server {
external_server
.prepare_credential_event_trigger(Arc::new(Mutex::new(Some(app.clone()))), is_self_signed)
.prepare_credential_event_trigger(Arc::new(Mutex::new(Some(app.clone()))), is_self_signed, delay)
.await;
}

Expand Down
5 changes: 4 additions & 1 deletion agent_api_rest/src/verification/relying_party/redirect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub mod tests {
assert_eq!(response.status(), StatusCode::OK);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
#[tracing_test::traced_test]
async fn test_redirect_endpoint() {
let mock_server = MockServer::start().await;
Expand Down Expand Up @@ -187,6 +187,9 @@ pub mod tests {
request(&mut app, state.clone()).await;
redirect(&mut app, state).await;

// Wait for the request to arrive at the mock server endpoint.
std::thread::sleep(std::time::Duration::from_millis(100));

// Assert that the event was dispatched to the target URL.
assert!(mock_server.received_requests().await.unwrap().len() == 1);
}
Expand Down
3 changes: 3 additions & 0 deletions agent_application/docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,6 @@ To integrate just-in-time data request events into your workflow, adhere to the
```
2. Upon initiation of the OpenID4VCI flow by a Wallet, the CredentialRequestVerified event is triggered, containing relevant identifiers.
3. The HTTP Event Publisher dispatches the event to the external system. Leveraging the provided identifiers, the external system generates and signs the credential, then submits it to UniCore's `/v1/credentials` endpoint. Refer to the [API specification](../../agent_api_rest/README.md)) for additional details on endpoint usage.
By default, UniCore will wait up to 1000 ms for the signed credential to arrive. This parameter can be changed by
setting the `AGENT_API_REST_EXTERNAL_SERVER_RESPONSE_TIMEOUT_MS` environment variable.
1 change: 1 addition & 0 deletions agent_event_publisher_http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus
serde.workspace = true
serde_with.workspace = true
serde_yaml.workspace = true
tokio.workspace = true

[dev-dependencies]
agent_event_publisher_http = { path = ".", features = ["test"] }
Expand Down
17 changes: 10 additions & 7 deletions agent_event_publisher_http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ where
async fn dispatch(&self, _view_id: &str, events: &[EventEnvelope<A>]) {
for event in events {
if self.target_events.contains(&event.payload.event_type()) {
self.client
.post(&self.target_url)
.json(&event.payload)
.send()
.await
.ok();
let request = self.client.post(&self.target_url).json(&event.payload);

// Send the request in a separate thread so that we don't have to await the response in the current thread.
tokio::task::spawn(async move {
request.send().await.ok();
});
}
}
}
Expand All @@ -140,7 +140,7 @@ mod tests {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn it_works() {
let mock_server = MockServer::start().await;

Expand Down Expand Up @@ -187,6 +187,9 @@ mod tests {
// Dispatch the event.
publisher.offer.as_ref().unwrap().dispatch("view_id", &events).await;

// Wait for the request to arrive at the mock server endpoint.
std::thread::sleep(std::time::Duration::from_millis(100));

// Assert that the event was dispatched to the target URL.
assert_eq!(
offer_event,
Expand Down

0 comments on commit ff9a207

Please sign in to comment.