Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
added broadcast_subscription flag to room.enter
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Jul 19, 2021
1 parent 738fc21 commit 8a16b87
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 35 deletions.
7 changes: 4 additions & 3 deletions docs/src/api/room/enter.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ The tenant authorizes the current _agent_ for `read` action on

## Multicast request

Name | Type | Default | Description
---- | ---- | ---------- | --------------------
id | uuid | _required_ | The room identifier.
Name | Type | Default | Description
---------------------- | ---- | ---------- | --------------------
id | uuid | _required_ | The room identifier.
broadcast_subscription | bool | false | Whether also to subscribe to broadcast topic `broadcasts/{EVENT_APP_ID}/api/v1/rooms/:id/events`

## Unicast response

Expand Down
9 changes: 7 additions & 2 deletions src/app/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ request_routes!(
pub(crate) enum CorrelationData {
SubscriptionCreate(subscription::CorrelationDataPayload),
SubscriptionDelete(subscription::CorrelationDataPayload),
BroadcastSubscriptionCreate(subscription::CorrelationDataPayload),
BroadcastSubscriptionDelete(subscription::CorrelationDataPayload),
}

#[async_trait]
Expand Down Expand Up @@ -112,7 +114,9 @@ macro_rules! response_routes {

response_routes!(
SubscriptionCreate => subscription::CreateResponseHandler,
SubscriptionDelete => subscription::DeleteResponseHandler
SubscriptionDelete => subscription::DeleteResponseHandler,
BroadcastSubscriptionCreate => subscription::BroadcastCreateResponseHandler,
BroadcastSubscriptionDelete => subscription::BroadcastDeleteResponseHandler
);

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -150,7 +154,8 @@ macro_rules! event_routes {
// Event routes configuration: label => EventHandler
event_routes!(
"metric.pull" => metric::PullHandler,
"subscription.delete" => subscription::DeleteEventHandler
"subscription.delete" => subscription::DeleteEventHandler,
"broadcast_subscription.delete" => subscription::BroadcastDeleteEventHandler
);

///////////////////////////////////////////////////////////////////////////////
Expand Down
124 changes: 94 additions & 30 deletions src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ impl RequestHandler for UpdateHandler {
#[derive(Debug, Deserialize)]
pub(crate) struct EnterRequest {
id: Uuid,
#[serde(default)]
broadcast_subscription: bool,
}

pub(crate) struct EnterHandler;
Expand Down Expand Up @@ -403,40 +405,66 @@ impl RequestHandler for EnterHandler {
.error(AppErrorKind::DbQueryFailed)?;
}

let mut requests = Vec::with_capacity(2);
// Send dynamic subscription creation request to the broker.
let subject = reqp.as_agent_id().to_owned();
let object = vec![
"rooms".to_string(),
room_id.to_owned(),
"events".to_string(),
];
let payload = SubscriptionRequest::new(subject.clone(), object.clone());

let broker_id = AgentId::new("nevermind", context.config().broker_id.to_owned());
let subscribe_req =
subscription_request(context, &reqp, &room_id, "subscription.create", authz_time)?;

let response_topic = Subscription::unicast_responses_from(&broker_id)
.subscription_topic(context.agent_id(), API_VERSION)
.context("Failed to build response topic")
.error(AppErrorKind::BrokerRequestFailed)?;
requests.push(subscribe_req);

let corr_data_payload = CorrelationDataPayload::new(reqp.to_owned(), subject, object);

let corr_data = CorrelationData::SubscriptionCreate(corr_data_payload)
.dump()
.context("Failed to dump correlation data")
.error(AppErrorKind::BrokerRequestFailed)?;
let broadcast_subscribe_req = subscription_request(
context,
&reqp,
&room_id,
"broadcast_subscription.create",
authz_time,
)?;

let mut timing = ShortTermTimingProperties::until_now(context.start_timestamp());
timing.set_authorization_time(authz_time);
requests.push(broadcast_subscribe_req);

let props = reqp.to_request("subscription.create", &response_topic, &corr_data, timing);
let to = &context.config().broker_id;
let outgoing_request = OutgoingRequest::multicast(payload, props, to, API_VERSION);
let boxed_request = Box::new(outgoing_request) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_request)))
Ok(Box::new(stream::from_iter(requests)))
}
}

fn subscription_request<C: Context>(
context: &mut C,
reqp: &IncomingRequestProperties,
room_id: &str,
method: &str,
authz_time: chrono::Duration,
) -> std::result::Result<Box<dyn IntoPublishableMessage + Send>, AppError> {
let subject = reqp.as_agent_id().to_owned();
let object = vec![
"rooms".to_string(),
room_id.to_owned(),
"events".to_string(),
];
let payload = SubscriptionRequest::new(subject.clone(), object.clone());

let broker_id = AgentId::new("nevermind", context.config().broker_id.to_owned());

let response_topic = Subscription::unicast_responses_from(&broker_id)
.subscription_topic(context.agent_id(), API_VERSION)
.context("Failed to build response topic")
.error(AppErrorKind::BrokerRequestFailed)?;

let corr_data_payload = CorrelationDataPayload::new(reqp.to_owned(), subject, object);

let corr_data = CorrelationData::SubscriptionCreate(corr_data_payload)
.dump()
.context("Failed to dump correlation data")
.error(AppErrorKind::BrokerRequestFailed)?;

let mut timing = ShortTermTimingProperties::until_now(context.start_timestamp());
timing.set_authorization_time(authz_time);

let props = reqp.to_request(method, &response_topic, &corr_data, timing);
let to = &context.config().broker_id;
let outgoing_request = OutgoingRequest::multicast(payload, props, to, API_VERSION);
let boxed_request = Box::new(outgoing_request) as Box<dyn IntoPublishableMessage + Send>;
Ok(boxed_request)
}

///////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -1325,6 +1353,30 @@ mod tests {
use super::super::*;
use super::DynSubRequest;

#[test]
fn test_parsing() {
serde_json::from_str::<EnterRequest>(
r#"
{"id": "82f62913-c2ba-4b21-b24f-5ed499107c0a"}
"#,
)
.expect("Failed to parse EnterRequest");

serde_json::from_str::<EnterRequest>(
r#"
{"id": "82f62913-c2ba-4b21-b24f-5ed499107c0a", "broadcast_subscription": true}
"#,
)
.expect("Failed to parse EnterRequest");

serde_json::from_str::<EnterRequest>(
r#"
{"id": "82f62913-c2ba-4b21-b24f-5ed499107c0a", "broadcast_subscription": false}
"#,
)
.expect("Failed to parse EnterRequest");
}

#[test]
fn enter_room() {
async_std::task::block_on(async {
Expand All @@ -1345,7 +1397,10 @@ mod tests {

// Make room.enter request.
let mut context = TestContext::new(db, authz);
let payload = EnterRequest { id: room.id() };
let payload = EnterRequest {
id: room.id(),
broadcast_subscription: false,
};

let messages = handle_request::<EnterHandler>(&mut context, &agent, payload)
.await
Expand Down Expand Up @@ -1383,7 +1438,10 @@ mod tests {

// Make room.enter request.
let mut context = TestContext::new(db, TestAuthz::new());
let payload = EnterRequest { id: room.id() };
let payload = EnterRequest {
id: room.id(),
broadcast_subscription: false,
};

let err = handle_request::<EnterHandler>(&mut context, &agent, payload)
.await
Expand All @@ -1398,7 +1456,10 @@ mod tests {
async_std::task::block_on(async {
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let mut context = TestContext::new(TestDb::new().await, TestAuthz::new());
let payload = EnterRequest { id: Uuid::new_v4() };
let payload = EnterRequest {
id: Uuid::new_v4(),
broadcast_subscription: false,
};

let err = handle_request::<EnterHandler>(&mut context, &agent, payload)
.await
Expand Down Expand Up @@ -1429,7 +1490,10 @@ mod tests {

// Make room.enter request.
let mut context = TestContext::new(db, TestAuthz::new());
let payload = EnterRequest { id: room.id() };
let payload = EnterRequest {
id: room.id(),
broadcast_subscription: false,
};

let err = handle_request::<EnterHandler>(&mut context, &agent, payload)
.await
Expand Down
99 changes: 99 additions & 0 deletions src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,42 @@ impl ResponseHandler for CreateResponseHandler {

///////////////////////////////////////////////////////////////////////////////

pub(crate) struct BroadcastCreateResponseHandler;

#[async_trait]
impl ResponseHandler for BroadcastCreateResponseHandler {
type Payload = CreateDeleteResponsePayload;
type CorrelationData = CorrelationDataPayload;

async fn handle<C: Context>(
context: &mut C,
_payload: Self::Payload,
respp: &IncomingResponseProperties,
_corr_data: &Self::CorrelationData,
) -> Result {
// Check if the event is sent by the broker.
if respp.as_account_id() != &context.config().broker_id {
return Err(anyhow!(
"Expected broadcast_subscription.create event to be sent from the broker account '{}', got '{}'",
context.config().broker_id,
respp.as_account_id()
)).error(AppErrorKind::AccessDenied);
}

context.add_logger_tags(o!(
"agent_label" => respp.as_agent_id().label().to_owned(),
"account_label" => respp.as_account_id().label().to_owned(),
"audience" => respp.as_account_id().audience().to_owned(),
));

warn!(context.logger(), "Broadcast subscription created");

Ok(Box::new(stream::empty()))
}
}

///////////////////////////////////////////////////////////////////////////////

pub(crate) struct DeleteResponseHandler;

#[async_trait]
Expand Down Expand Up @@ -236,6 +272,38 @@ impl ResponseHandler for DeleteResponseHandler {

////////////////////////////////////////////////////////////////////////////////

pub(crate) struct BroadcastDeleteResponseHandler;

#[async_trait]
impl ResponseHandler for BroadcastDeleteResponseHandler {
type Payload = CreateDeleteResponsePayload;
type CorrelationData = CorrelationDataPayload;

async fn handle<C: Context>(
context: &mut C,
_payload: Self::Payload,
respp: &IncomingResponseProperties,
corr_data: &Self::CorrelationData,
) -> Result {
// Check if the event is sent by the broker.
if respp.as_account_id() != &context.config().broker_id {
return Err(anyhow!(
"Expected subscription.delete event to be sent from the broker account '{}', got '{}'",
context.config().broker_id,
respp.as_account_id()
)).error(AppErrorKind::AccessDenied);
}

// Parse room id.
let room_id = try_room_id(&corr_data.object)?;
context.add_logger_tags(o!("room_id" => room_id.to_string()));

Ok(Box::new(stream::empty()))
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Deserialize)]
pub(crate) struct DeleteEventPayload {
subject: AgentId,
Expand Down Expand Up @@ -306,6 +374,37 @@ impl EventHandler for DeleteEventHandler {

///////////////////////////////////////////////////////////////////////////////

pub(crate) struct BroadcastDeleteEventHandler;

#[async_trait]
impl EventHandler for BroadcastDeleteEventHandler {
type Payload = DeleteEventPayload;

async fn handle<C: Context>(
context: &mut C,
payload: Self::Payload,
evp: &IncomingEventProperties,
) -> Result {
// Check if the event is sent by the broker.
if evp.as_account_id() != &context.config().broker_id {
return Err(anyhow!(
"Expected broadcast_subscription.delete event to be sent from the broker account '{}', got '{}'",
context.config().broker_id,
evp.as_account_id()
)).error(AppErrorKind::AccessDenied);
}

let room_id = try_room_id(&payload.object)?;
context.add_logger_tags(o!("room_id" => room_id.to_string()));

warn!(context.logger(), "Broadcast subscription deleted by event");

Ok(Box::new(stream::empty()))
}
}

///////////////////////////////////////////////////////////////////////////////

fn try_room_id(object: &[String]) -> StdResult<Uuid, AppError> {
let object: Vec<&str> = object.iter().map(AsRef::as_ref).collect();

Expand Down

0 comments on commit 8a16b87

Please sign in to comment.