Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OlmMachine.registerRoomKeysWithheldCallback #136

Merged
merged 5 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
- `EncryptionSettings.onlyAllowTrustedDevices` has been replaced with
`EncryptionSettings.sharingStrategy`, which adds the ability to share only
with cross-signed devices.
([#134](https://github.com/matrix-org/matrix-rust-sdk-crypto-wasm/pull/134))

**Other changes**

- Update matrix-rust-sdk to `11cbf849c`, which includes:
- Add `OlmMachine.registerRoomKeysWithheldCallback` to notify when we are
told that room keys have been withheld.
([#136](https://github.com/matrix-org/matrix-rust-sdk-crypto-wasm/pull/136))

- Update matrix-rust-sdk to `8d54bd92d`, which includes:

- refactor(sdk-crypto): Room key sharing, introduce extensible strategy
([#3605](https://github.com/matrix-org/matrix-rust-sdk/pull/3605))
Expand Down
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

185 changes: 101 additions & 84 deletions src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{
collections::{BTreeMap, HashSet},
iter,
ops::Deref,
pin::{pin, Pin},
time::Duration,
Expand Down Expand Up @@ -36,7 +37,7 @@ use crate::{
requests::{outgoing_request_to_js_value, CrossSigningBootstrapRequests, ToDeviceRequest},
responses::{self, response_from_string},
store,
store::{RoomKeyInfo, StoreHandle},
store::{RoomKeyInfo, RoomKeyWithheldInfo, StoreHandle},
sync_events,
types::{self, RoomKeyImportResult, RoomSettings, SignatureVerification},
verification, vodozemac,
Expand Down Expand Up @@ -1305,14 +1306,41 @@ impl OlmMachine {
pub async fn register_room_key_updated_callback(&self, callback: Function) {
let stream = self.inner.store().room_keys_received_stream();

// fire up a promise chain which will call `cb` on each result from the stream
spawn_local(async move {
// take a reference to `callback` (which we then pass into the closure), to stop
// the callback being moved into the closure (which would mean we could only
// call the closure once)
let callback_ref = &callback;
stream.for_each(move |item| send_room_key_info_to_callback(callback_ref, item)).await;
});
copy_stream_to_callback(
stream,
|input| {
iter::once(
input.into_iter().map(RoomKeyInfo::from).map(JsValue::from).collect::<Array>(),
)
},
callback,
"room-key-received",
);
}

/// Register a callback which will be called whenever we receive a
/// notification that some room keys have been withheld.
///
/// `callback` should be a function that takes a single argument (an array
/// of {@link RoomKeyWithheldInfo}) and returns a Promise.
#[wasm_bindgen(js_name = "registerRoomKeysWithheldCallback")]
pub async fn register_room_keys_withheld_callback(&self, callback: Function) {
let stream = self.inner.store().room_keys_withheld_received_stream();

copy_stream_to_callback(
stream,
|input| {
iter::once(
input
.into_iter()
.map(RoomKeyWithheldInfo::from)
.map(JsValue::from)
.collect::<Array>(),
)
},
callback,
"room-key-withheld",
);
}

/// Register a callback which will be called whenever there is an update to
Expand All @@ -1324,14 +1352,18 @@ impl OlmMachine {
pub async fn register_user_identity_updated_callback(&self, callback: Function) {
let stream = self.inner.store().identities_stream_raw();

// fire up a promise chain which will call `cb` on each result from the stream
spawn_local(async move {
// take a reference to `callback` (which we then pass into the closure), to stop
// the callback being moved into the closure (which would mean we could only
// call the closure once)
let callback_ref = &callback;
stream.for_each(move |item| send_user_identities_to_callback(callback_ref, item)).await;
});
copy_stream_to_callback(
stream,
|(identity_updates, _)| {
identity_updates
.new
.into_iter()
.chain(identity_updates.changed.into_iter())
.map(|update| identifiers::UserId::from(update.user_id().to_owned()))
},
callback,
"user-identity-updated",
);
}

/// Register a callback which will be called whenever there is an update to
Expand All @@ -1343,15 +1375,25 @@ impl OlmMachine {
pub async fn register_devices_updated_callback(&self, callback: Function) {
let stream = self.inner.store().identities_stream_raw();

// fire up a promise chain which will call `callback` on each result from the
// stream
spawn_local(async move {
// take a reference to `callback` (which we then pass into the closure), to stop
// the callback being moved into the closure (which would mean we could only
// call the closure once)
let callback_ref = &callback;
stream.for_each(move |item| send_device_updates_to_callback(callback_ref, item)).await;
});
fn mapper(changes: (IdentityChanges, DeviceChanges)) -> iter::Once<Array> {
let (_, device_updates) = changes;

// get the user IDs of all the devices that have changed
let updated_chain = device_updates
.new
.into_iter()
.chain(device_updates.changed.into_iter())
.chain(device_updates.deleted.into_iter());

// put them in a set to make them unique
let updated_users: HashSet<String> =
HashSet::from_iter(updated_chain.map(|device| device.user_id().to_string()));

// ... and collect to a JS Array
iter::once(updated_users.into_iter().map(JsValue::from).collect())
}

copy_stream_to_callback(stream, mapper, callback, "device-updated");
}

/// Register a callback which will be called whenever a secret
Expand Down Expand Up @@ -1538,66 +1580,41 @@ impl OlmMachine {
}
}

// helper for register_room_key_received_callback: wraps the key info
// into our own RoomKeyInfo struct, and passes it into the javascript
// function
async fn send_room_key_info_to_callback(
callback: &Function,
room_key_info: Vec<matrix_sdk_crypto::store::RoomKeyInfo>,
) {
let rki: Array = room_key_info.into_iter().map(RoomKeyInfo::from).map(JsValue::from).collect();
match promise_result_to_future(callback.call1(&JsValue::NULL, &rki)).await {
Ok(_) => (),
Err(e) => {
warn!("Error calling room-key-received callback: {:?}", e);
}
}
}

// helper for register_user_identity_updated_callback: passes the user ID into
// the javascript function
async fn send_user_identities_to_callback(
callback: &Function,
(identity_updates, _): (IdentityChanges, DeviceChanges),
) {
let update_chain = identity_updates.new.into_iter().chain(identity_updates.changed.into_iter());
for update in update_chain {
let user_id = identifiers::UserId::from(update.user_id().to_owned());
match promise_result_to_future(callback.call1(&JsValue::NULL, &(user_id.into()))).await {
Ok(_) => (),
Err(e) => {
warn!("Error calling user-identity-updated callback: {:?}", e);
/// Helper for `register_*_callback` methods: fires off a background job (or
/// rather, a chain of JS promises) which will copy items from the stream to the
/// callback.
///
/// # Arguments
///
/// * `stream`: the stream to copy items from.
/// * `mapper`: a function which takes items from the stream, and converts them
/// to an iterator of values to send to the callback. Each entry in the
/// iterator will result in a call to the callback.
/// * `callback`: the javascript callback function.
/// * `callback_name`: a name for this type of callback, for error reporting.
fn copy_stream_to_callback<Item, MappedTypeIterator, MappedType>(
stream: impl Stream<Item = Item> + 'static,
mapper: impl Fn(Item) -> MappedTypeIterator + 'static,
callback: Function,
callback_name: &'static str,
) where
MappedTypeIterator: Iterator<Item = MappedType>,
MappedType: Into<JsValue>,
{
spawn_local(async move {
pin_mut!(stream);

while let Some(item) = stream.next().await {
for val in mapper(item) {
match promise_result_to_future(callback.call1(&JsValue::NULL, &val.into())).await {
Ok(_) => (),
Err(e) => {
warn!("Error calling {} callback: {:?}", callback_name, e);
}
}
}
}
}
}

// helper for register_device_updated_callback: passes the user IDs into
// the javascript function
async fn send_device_updates_to_callback(
callback: &Function,
(_, device_updates): (IdentityChanges, DeviceChanges),
) {
// get the user IDs of all the devices that have changed
let updated_chain = device_updates
.new
.into_iter()
.chain(device_updates.changed.into_iter())
.chain(device_updates.deleted.into_iter());
// put them in a set to make them unique
let updated_users: HashSet<String> =
HashSet::from_iter(updated_chain.map(|device| device.user_id().to_string()));
let updated_users_vec = Vec::from_iter(updated_users.iter());
match promise_result_to_future(
callback.call1(&JsValue::NULL, &serde_wasm_bindgen::to_value(&updated_users_vec).unwrap()),
)
.await
{
Ok(_) => (),
Err(e) => {
warn!("Error calling device-updated callback: {:?}", e);
}
}
});
}

// helper for register_secret_receive_callback: passes the secret name and value
Expand Down
47 changes: 46 additions & 1 deletion src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use wasm_bindgen::prelude::*;
use zeroize::{Zeroize, Zeroizing};

use crate::{
encryption::EncryptionAlgorithm, identifiers::RoomId, impl_from_to_inner,
encryption::EncryptionAlgorithm,
identifiers::{RoomId, UserId},
impl_from_to_inner,
vodozemac::Curve25519PublicKey,
};

Expand Down Expand Up @@ -205,6 +207,49 @@ impl RoomKeyInfo {
}
}

/// Information on a received `m.room_key.withheld` event.
#[wasm_bindgen]
#[derive(Debug)]
pub struct RoomKeyWithheldInfo {
pub(crate) inner: matrix_sdk_crypto::store::RoomKeyWithheldInfo,
}

impl_from_to_inner!(matrix_sdk_crypto::store::RoomKeyWithheldInfo => RoomKeyWithheldInfo);

#[wasm_bindgen]
impl RoomKeyWithheldInfo {
/// The User ID of the user that sent us the `m.room_key.withheld` message.
#[wasm_bindgen(getter)]
pub fn sender(&self) -> UserId {
self.inner.withheld_event.sender.to_owned().into()
}

/// The encryption algorithm of the session that is being withheld.
#[wasm_bindgen(getter)]
pub fn algorithm(&self) -> EncryptionAlgorithm {
self.inner.withheld_event.content.algorithm().into()
}

/// The `code` from the `m.room_key.withheld` message, such as
/// `m.unverified`.
#[wasm_bindgen(getter, js_name = "withheldCode")]
pub fn withheld_code(&self) -> String {
self.inner.withheld_event.content.withheld_code().as_str().to_owned()
}

/// The room ID of the session that is being withheld.
#[wasm_bindgen(getter, js_name = "roomId")]
pub fn room_id(&self) -> RoomId {
self.inner.room_id.to_owned().into()
}

/// The session ID of the session that is being withheld.
#[wasm_bindgen(getter, js_name = "sessionId")]
pub fn session_id(&self) -> String {
self.inner.session_id.to_owned()
}
}

/// Struct containing the bundle of secrets to fully activate a new device for
/// end-to-end encryption.
#[derive(Debug)]
Expand Down
Loading
Loading