From 78608a1b9d0eee447aeedef2a4229734f8b33e20 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 3 Jun 2024 15:16:17 +0200 Subject: [PATCH] sending queue: do a few renamings after the live review Thanks @Hywan for the review comments! --- crates/matrix-sdk/src/client/mod.rs | 10 ++-- crates/matrix-sdk/src/send_queue.rs | 92 ++++++++++++++++------------- 2 files changed, 58 insertions(+), 44 deletions(-) diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index b4f7c8aa8db..652162c201c 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -284,8 +284,10 @@ pub(crate) struct ClientInner { #[cfg(feature = "e2e-encryption")] pub(crate) verification_state: SharedObservable, - /// Data related to the sending queue. - pub(crate) sending_queue: Arc, + /// Data related to the [`SendingQueue`]. + /// + /// [`SendingQueue`]: crate::send_queue::SendingQueue + pub(crate) sending_queue_data: Arc, } impl ClientInner { @@ -328,7 +330,7 @@ impl ClientInner { respect_login_well_known, sync_beat: event_listener::Event::new(), event_cache, - sending_queue, + sending_queue_data: sending_queue, #[cfg(feature = "e2e-encryption")] e2ee: EncryptionData::new(encryption_settings), #[cfg(feature = "e2e-encryption")] @@ -2108,7 +2110,7 @@ impl Client { self.inner.unstable_features.get().cloned(), self.inner.respect_login_well_known, self.inner.event_cache.clone(), - self.inner.sending_queue.clone(), + self.inner.sending_queue_data.clone(), #[cfg(feature = "e2e-encryption")] self.inner.e2ee.encryption_settings, ) diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 5eba6f5fad8..5d78d2564df 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -52,7 +52,7 @@ impl SendingQueue { } fn for_room(&self, room: Room) -> RoomSendingQueue { - let data = &self.client.inner.sending_queue; + let data = &self.client.inner.sending_queue_data; let mut map = data.rooms.write().unwrap(); @@ -63,8 +63,8 @@ impl SendingQueue { let owned_room_id = room_id.to_owned(); let room_q = RoomSendingQueue::new( - data.enabled.clone(), - data.shutting_down.clone(), + data.globally_enabled.clone(), + data.is_dropping.clone(), &self.client, owned_room_id.clone(), ); @@ -72,14 +72,14 @@ impl SendingQueue { room_q } - /// Enables the sending queue for the entire client, i.e. all rooms. + /// Enable the sending queue for the entire client, i.e. all rooms. /// - /// This may wake up backgrounds tasks and resume sending of events in the + /// This may wake up background tasks and resume sending of events in the /// background. pub fn enable(&self) { - if self.client.inner.sending_queue.enabled.set_if_not_eq(true).is_some() { + if self.client.inner.sending_queue_data.globally_enabled.set_if_not_eq(true).is_some() { debug!("globally enabling sending queue"); - let rooms = self.client.inner.sending_queue.rooms.read().unwrap(); + let rooms = self.client.inner.sending_queue_data.rooms.read().unwrap(); // Wake up the rooms, in case events have been queued in the meanwhile. for room in rooms.values() { room.inner.notifier.notify_one(); @@ -87,7 +87,7 @@ impl SendingQueue { } } - /// Disables the sending queue for the entire client, i.e. all rooms. + /// Disable the sending queue for the entire client, i.e. all rooms. /// /// If requests were being sent, they're not aborted, and will continue /// until a status resolves (error responses will keep the events in the @@ -101,19 +101,19 @@ impl SendingQueue { // - or they were not, and it's not worth it waking them to let them they're // disabled, which causes them to go to sleep again. debug!("globally disabling sending queue"); - self.client.inner.sending_queue.enabled.set(false); + self.client.inner.sending_queue_data.globally_enabled.set(false); } /// Returns whether the sending queue is enabled, at a client-wide /// granularity. pub fn is_enabled(&self) -> bool { - self.client.inner.sending_queue.enabled.get() + self.client.inner.sending_queue_data.globally_enabled.get() } /// A subscriber to the enablement status (enabled or disabled) of the /// sending queue. pub fn subscribe_status(&self) -> Subscriber { - self.client.inner.sending_queue.enabled.subscribe() + self.client.inner.sending_queue_data.globally_enabled.subscribe() } } @@ -130,19 +130,19 @@ pub(super) struct SendingQueueData { rooms: SyncRwLock>, /// Is the whole mechanism enabled or disabled? - enabled: SharedObservable, + globally_enabled: SharedObservable, - /// Are we shutting down the entire queue? - shutting_down: Arc, + /// Are we currently dropping the Client? + is_dropping: Arc, } impl SendingQueueData { /// Create the data for a sending queue, in the given enabled state. - pub fn new(enabled: bool) -> Self { + pub fn new(globally_enabled: bool) -> Self { Self { rooms: Default::default(), - enabled: SharedObservable::new(enabled), - shutting_down: Arc::new(false.into()), + globally_enabled: SharedObservable::new(globally_enabled), + is_dropping: Arc::new(false.into()), } } } @@ -151,8 +151,8 @@ impl Drop for SendingQueueData { fn drop(&mut self) { // Mark the whole sending queue as shutting down, then wake up all the room // queues so they're stopped too. - debug!("globally shutting down the sending queue"); - self.shutting_down.store(true, Ordering::SeqCst); + debug!("globally dropping the sending queue"); + self.is_dropping.store(true, Ordering::SeqCst); let rooms = self.rooms.read().unwrap(); for room in rooms.values() { @@ -184,14 +184,14 @@ impl std::fmt::Debug for RoomSendingQueue { impl RoomSendingQueue { fn new( - enabled: SharedObservable, - shutting_down: Arc, + globally_enabled: SharedObservable, + is_dropping: Arc, client: &Client, room_id: OwnedRoomId, ) -> Self { let (updates_sender, _) = broadcast::channel(32); - let queue = SharedQueue::new(); + let queue = QueueStorage::new(); let notifier = Arc::new(Notify::new()); let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id); @@ -201,8 +201,8 @@ impl RoomSendingQueue { queue.clone(), notifier.clone(), updates_sender.clone(), - enabled, - shutting_down, + globally_enabled, + is_dropping, )); Self { @@ -242,6 +242,7 @@ impl RoomSendingQueue { let transaction_id = self.inner.queue.push(content.clone()).await; trace!(%transaction_id, "manager sends an event to the background task"); + self.inner.notifier.notify_one(); let _ = self.inner.updates.send(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { @@ -261,29 +262,29 @@ impl RoomSendingQueue { #[instrument(skip_all, fields(room_id = %room.room_id()))] async fn sending_task( room: WeakRoom, - queue: SharedQueue, + queue: QueueStorage, notifier: Arc, updates: broadcast::Sender, - enabled: SharedObservable, - shutting_down: Arc, + globally_enabled: SharedObservable, + is_dropping: Arc, ) { info!("spawned the sending task"); loop { // A request to shut down should be preferred above everything else. - if shutting_down.load(Ordering::SeqCst) { + if is_dropping.load(Ordering::SeqCst) { trace!("shutting down!"); break; } - if !enabled.get() { + if !globally_enabled.get() { trace!("not enabled, sleeping"); // Wait for an explicit wakeup. notifier.notified().await; continue; } - let Some(queued_event) = queue.pop_next_to_send().await else { + let Some(queued_event) = queue.peek_next_to_send().await else { trace!("queue is empty, sleeping"); // Wait for an explicit wakeup. notifier.notified().await; @@ -293,7 +294,7 @@ impl RoomSendingQueue { trace!("received an event to send!"); let Some(room) = room.get() else { - if shutting_down.load(Ordering::SeqCst) { + if is_dropping.load(Ordering::SeqCst) { break; } error!("the weak room couldn't be upgraded but we're not shutting down?"); @@ -322,7 +323,7 @@ impl RoomSendingQueue { // Disable the queue after an error. // See comment in [`SendingQueue::disable()`]. - enabled.set(false); + globally_enabled.set(false); // In this case, we intentionally keep the event in the queue, but mark it as // not being sent anymore. @@ -360,10 +361,10 @@ struct RoomSendingQueueInner { /// content / deleting entries, all that will be required will be to /// manipulate the on-disk storage. In other words, the storage will become /// the one source of truth. - queue: SharedQueue, + queue: QueueStorage, /// A notifier that's updated any time common data is touched (stopped or - /// enabled statuses), or the associated room [`SharedQueue`]. + /// enabled statuses), or the associated room [`QueueStorage`]. notifier: Arc, /// Handle to the actual sending task. Unused, but kept alive along this @@ -375,13 +376,18 @@ struct RoomSendingQueueInner { struct QueuedEvent { event: AnyMessageLikeEventContent, transaction_id: OwnedTransactionId, + + /// Flag to indicate if an event has been scheduled for sending. + /// + /// Useful to indicate if cancelling could happen or if it was too late and + /// the event had already been sent. is_being_sent: bool, } #[derive(Clone)] -struct SharedQueue(Arc>>); +struct QueueStorage(Arc>>); -impl SharedQueue { +impl QueueStorage { /// Create a new synchronized queue for queuing events to be sent later. fn new() -> Self { Self(Arc::new(RwLock::new(VecDeque::with_capacity(16)))) @@ -402,13 +408,15 @@ impl SharedQueue { transaction_id } - /// Pops the next event to be sent, marking it as being sent. + /// Peeks the next event to be sent, marking it as being sent. /// /// It is required to call [`Self::mark_as_sent`] after it's been /// effectively sent. - async fn pop_next_to_send(&self) -> Option { + async fn peek_next_to_send(&self) -> Option { let mut q = self.0.write().await; if let Some(event) = q.front_mut() { + // TODO: This flag should probably live in memory when we have an actual + // storage. event.is_being_sent = true; Some(event.clone()) } else { @@ -416,7 +424,7 @@ impl SharedQueue { } } - /// Marks an event popped with [`Self::pop_next_to_send`] and identified + /// Marks an event popped with [`Self::peek_next_to_send`] and identified /// with the given transaction id as not being sent anymore, so it can /// be removed from the queue later. async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) { @@ -432,7 +440,11 @@ impl SharedQueue { /// transaction id as sent by removing it from the local queue. async fn mark_as_sent(&self, transaction_id: &TransactionId) { let mut q = self.0.write().await; - q.retain(|item| item.transaction_id != transaction_id); + if let Some(index) = q.iter().position(|item| item.transaction_id == transaction_id) { + q.remove(index); + } else { + warn!("couldn't find item to mark as sent with transaction id {transaction_id}"); + } } /// Cancel a sending command for an event that has been sent with