diff --git a/docs/src/api/room/enter.md b/docs/src/api/room/enter.md index 951ff51b..f343e7e4 100644 --- a/docs/src/api/room/enter.md +++ b/docs/src/api/room/enter.md @@ -18,9 +18,10 @@ The tenant authorizes the current _agent_ for `read` action on ## Multicast request -Name | Type | Default | Description ----- | ---- | ---------- | -------------------- -id | uuid | _required_ | The room identifier. +Name | Type | Default | Description +---------------------- | ---- | ---------- | -------------------- +id | uuid | _required_ | The room identifier. +broadcast_subscription | bool | false | Whether also to subscribe to broadcast topic `broadcasts/{EVENT_APP_ID}/api/v1/rooms/:id/events` ## Unicast response diff --git a/src/app/endpoint/mod.rs b/src/app/endpoint/mod.rs index b322d968..cf7ca542 100644 --- a/src/app/endpoint/mod.rs +++ b/src/app/endpoint/mod.rs @@ -78,6 +78,8 @@ request_routes!( pub(crate) enum CorrelationData { SubscriptionCreate(subscription::CorrelationDataPayload), SubscriptionDelete(subscription::CorrelationDataPayload), + BroadcastSubscriptionCreate(subscription::CorrelationDataPayload), + BroadcastSubscriptionDelete(subscription::CorrelationDataPayload), } #[async_trait] @@ -112,7 +114,9 @@ macro_rules! response_routes { response_routes!( SubscriptionCreate => subscription::CreateResponseHandler, - SubscriptionDelete => subscription::DeleteResponseHandler + SubscriptionDelete => subscription::DeleteResponseHandler, + BroadcastSubscriptionCreate => subscription::BroadcastCreateResponseHandler, + BroadcastSubscriptionDelete => subscription::BroadcastDeleteResponseHandler ); /////////////////////////////////////////////////////////////////////////////// @@ -150,7 +154,8 @@ macro_rules! event_routes { // Event routes configuration: label => EventHandler event_routes!( "metric.pull" => metric::PullHandler, - "subscription.delete" => subscription::DeleteEventHandler + "subscription.delete" => subscription::DeleteEventHandler, + "broadcast_subscription.delete" => subscription::BroadcastDeleteEventHandler ); /////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/endpoint/room.rs b/src/app/endpoint/room.rs index 57a6759d..d16d4b09 100644 --- a/src/app/endpoint/room.rs +++ b/src/app/endpoint/room.rs @@ -347,6 +347,8 @@ impl RequestHandler for UpdateHandler { #[derive(Debug, Deserialize)] pub(crate) struct EnterRequest { id: Uuid, + #[serde(default)] + broadcast_subscription: bool, } pub(crate) struct EnterHandler; @@ -403,40 +405,66 @@ impl RequestHandler for EnterHandler { .error(AppErrorKind::DbQueryFailed)?; } + let mut requests = Vec::with_capacity(2); // Send dynamic subscription creation request to the broker. - let subject = reqp.as_agent_id().to_owned(); - let object = vec![ - "rooms".to_string(), - room_id.to_owned(), - "events".to_string(), - ]; - let payload = SubscriptionRequest::new(subject.clone(), object.clone()); - - let broker_id = AgentId::new("nevermind", context.config().broker_id.to_owned()); + let subscribe_req = + subscription_request(context, &reqp, &room_id, "subscription.create", authz_time)?; - let response_topic = Subscription::unicast_responses_from(&broker_id) - .subscription_topic(context.agent_id(), API_VERSION) - .context("Failed to build response topic") - .error(AppErrorKind::BrokerRequestFailed)?; + requests.push(subscribe_req); - let corr_data_payload = CorrelationDataPayload::new(reqp.to_owned(), subject, object); - - let corr_data = CorrelationData::SubscriptionCreate(corr_data_payload) - .dump() - .context("Failed to dump correlation data") - .error(AppErrorKind::BrokerRequestFailed)?; + let broadcast_subscribe_req = subscription_request( + context, + &reqp, + &room_id, + "broadcast_subscription.create", + authz_time, + )?; - let mut timing = ShortTermTimingProperties::until_now(context.start_timestamp()); - timing.set_authorization_time(authz_time); + requests.push(broadcast_subscribe_req); - let props = reqp.to_request("subscription.create", &response_topic, &corr_data, timing); - let to = &context.config().broker_id; - let outgoing_request = OutgoingRequest::multicast(payload, props, to, API_VERSION); - let boxed_request = Box::new(outgoing_request) as Box; - Ok(Box::new(stream::once(boxed_request))) + Ok(Box::new(stream::from_iter(requests))) } } +fn subscription_request( + context: &mut C, + reqp: &IncomingRequestProperties, + room_id: &str, + method: &str, + authz_time: chrono::Duration, +) -> std::result::Result, AppError> { + let subject = reqp.as_agent_id().to_owned(); + let object = vec![ + "rooms".to_string(), + room_id.to_owned(), + "events".to_string(), + ]; + let payload = SubscriptionRequest::new(subject.clone(), object.clone()); + + let broker_id = AgentId::new("nevermind", context.config().broker_id.to_owned()); + + let response_topic = Subscription::unicast_responses_from(&broker_id) + .subscription_topic(context.agent_id(), API_VERSION) + .context("Failed to build response topic") + .error(AppErrorKind::BrokerRequestFailed)?; + + let corr_data_payload = CorrelationDataPayload::new(reqp.to_owned(), subject, object); + + let corr_data = CorrelationData::SubscriptionCreate(corr_data_payload) + .dump() + .context("Failed to dump correlation data") + .error(AppErrorKind::BrokerRequestFailed)?; + + let mut timing = ShortTermTimingProperties::until_now(context.start_timestamp()); + timing.set_authorization_time(authz_time); + + let props = reqp.to_request(method, &response_topic, &corr_data, timing); + let to = &context.config().broker_id; + let outgoing_request = OutgoingRequest::multicast(payload, props, to, API_VERSION); + let boxed_request = Box::new(outgoing_request) as Box; + Ok(boxed_request) +} + /////////////////////////////////////////////////////////////////////////////// #[derive(Debug, Deserialize)] @@ -1325,6 +1353,30 @@ mod tests { use super::super::*; use super::DynSubRequest; + #[test] + fn test_parsing() { + serde_json::from_str::( + r#" + {"id": "82f62913-c2ba-4b21-b24f-5ed499107c0a"} + "#, + ) + .expect("Failed to parse EnterRequest"); + + serde_json::from_str::( + r#" + {"id": "82f62913-c2ba-4b21-b24f-5ed499107c0a", "broadcast_subscription": true} + "#, + ) + .expect("Failed to parse EnterRequest"); + + serde_json::from_str::( + r#" + {"id": "82f62913-c2ba-4b21-b24f-5ed499107c0a", "broadcast_subscription": false} + "#, + ) + .expect("Failed to parse EnterRequest"); + } + #[test] fn enter_room() { async_std::task::block_on(async { @@ -1345,7 +1397,10 @@ mod tests { // Make room.enter request. let mut context = TestContext::new(db, authz); - let payload = EnterRequest { id: room.id() }; + let payload = EnterRequest { + id: room.id(), + broadcast_subscription: false, + }; let messages = handle_request::(&mut context, &agent, payload) .await @@ -1383,7 +1438,10 @@ mod tests { // Make room.enter request. let mut context = TestContext::new(db, TestAuthz::new()); - let payload = EnterRequest { id: room.id() }; + let payload = EnterRequest { + id: room.id(), + broadcast_subscription: false, + }; let err = handle_request::(&mut context, &agent, payload) .await @@ -1398,7 +1456,10 @@ mod tests { async_std::task::block_on(async { let agent = TestAgent::new("web", "user123", USR_AUDIENCE); let mut context = TestContext::new(TestDb::new().await, TestAuthz::new()); - let payload = EnterRequest { id: Uuid::new_v4() }; + let payload = EnterRequest { + id: Uuid::new_v4(), + broadcast_subscription: false, + }; let err = handle_request::(&mut context, &agent, payload) .await @@ -1429,7 +1490,10 @@ mod tests { // Make room.enter request. let mut context = TestContext::new(db, TestAuthz::new()); - let payload = EnterRequest { id: room.id() }; + let payload = EnterRequest { + id: room.id(), + broadcast_subscription: false, + }; let err = handle_request::(&mut context, &agent, payload) .await diff --git a/src/app/endpoint/subscription.rs b/src/app/endpoint/subscription.rs index 29218c96..2f3a617d 100644 --- a/src/app/endpoint/subscription.rs +++ b/src/app/endpoint/subscription.rs @@ -161,6 +161,42 @@ impl ResponseHandler for CreateResponseHandler { /////////////////////////////////////////////////////////////////////////////// +pub(crate) struct BroadcastCreateResponseHandler; + +#[async_trait] +impl ResponseHandler for BroadcastCreateResponseHandler { + type Payload = CreateDeleteResponsePayload; + type CorrelationData = CorrelationDataPayload; + + async fn handle( + context: &mut C, + _payload: Self::Payload, + respp: &IncomingResponseProperties, + _corr_data: &Self::CorrelationData, + ) -> Result { + // Check if the event is sent by the broker. + if respp.as_account_id() != &context.config().broker_id { + return Err(anyhow!( + "Expected broadcast_subscription.create event to be sent from the broker account '{}', got '{}'", + context.config().broker_id, + respp.as_account_id() + )).error(AppErrorKind::AccessDenied); + } + + context.add_logger_tags(o!( + "agent_label" => respp.as_agent_id().label().to_owned(), + "account_label" => respp.as_account_id().label().to_owned(), + "audience" => respp.as_account_id().audience().to_owned(), + )); + + warn!(context.logger(), "Broadcast subscription created"); + + Ok(Box::new(stream::empty())) + } +} + +/////////////////////////////////////////////////////////////////////////////// + pub(crate) struct DeleteResponseHandler; #[async_trait] @@ -236,6 +272,38 @@ impl ResponseHandler for DeleteResponseHandler { //////////////////////////////////////////////////////////////////////////////// +pub(crate) struct BroadcastDeleteResponseHandler; + +#[async_trait] +impl ResponseHandler for BroadcastDeleteResponseHandler { + type Payload = CreateDeleteResponsePayload; + type CorrelationData = CorrelationDataPayload; + + async fn handle( + context: &mut C, + _payload: Self::Payload, + respp: &IncomingResponseProperties, + corr_data: &Self::CorrelationData, + ) -> Result { + // Check if the event is sent by the broker. + if respp.as_account_id() != &context.config().broker_id { + return Err(anyhow!( + "Expected subscription.delete event to be sent from the broker account '{}', got '{}'", + context.config().broker_id, + respp.as_account_id() + )).error(AppErrorKind::AccessDenied); + } + + // Parse room id. + let room_id = try_room_id(&corr_data.object)?; + context.add_logger_tags(o!("room_id" => room_id.to_string())); + + Ok(Box::new(stream::empty())) + } +} + +//////////////////////////////////////////////////////////////////////////////// + #[derive(Debug, Deserialize)] pub(crate) struct DeleteEventPayload { subject: AgentId, @@ -306,6 +374,37 @@ impl EventHandler for DeleteEventHandler { /////////////////////////////////////////////////////////////////////////////// +pub(crate) struct BroadcastDeleteEventHandler; + +#[async_trait] +impl EventHandler for BroadcastDeleteEventHandler { + type Payload = DeleteEventPayload; + + async fn handle( + context: &mut C, + payload: Self::Payload, + evp: &IncomingEventProperties, + ) -> Result { + // Check if the event is sent by the broker. + if evp.as_account_id() != &context.config().broker_id { + return Err(anyhow!( + "Expected broadcast_subscription.delete event to be sent from the broker account '{}', got '{}'", + context.config().broker_id, + evp.as_account_id() + )).error(AppErrorKind::AccessDenied); + } + + let room_id = try_room_id(&payload.object)?; + context.add_logger_tags(o!("room_id" => room_id.to_string())); + + warn!(context.logger(), "Broadcast subscription deleted by event"); + + Ok(Box::new(stream::empty())) + } +} + +/////////////////////////////////////////////////////////////////////////////// + fn try_room_id(object: &[String]) -> StdResult { let object: Vec<&str> = object.iter().map(AsRef::as_ref).collect();