From 8a25dce1bc91dbe96e445c6867141df31a98e218 Mon Sep 17 00:00:00 2001 From: ranfdev Date: Tue, 7 Nov 2023 19:57:54 +0100 Subject: [PATCH] Reduce cpu usage and fix network monitor --- Cargo.lock | 20 +++++++++++++--- Cargo.toml | 1 + ntfy-daemon/Cargo.toml | 1 - ntfy-daemon/src/lib.rs | 2 +- ntfy-daemon/src/models.rs | 6 +++++ ntfy-daemon/src/system_client.rs | 7 +++--- ntfy-daemon/src/topic_listener.rs | 17 ++++---------- src/application.rs | 39 +++++++++++++++++++++++++++---- 8 files changed, 68 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6122467..237319c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -152,6 +152,19 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-channel" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d37875bd9915b7d67c2f117ea2c30a0989874d0b2cb694fe25403c85763c0c9e" +dependencies = [ + "concurrent-queue", + "event-listener 3.0.1", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-executor" version = "1.6.0" @@ -184,7 +197,7 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" dependencies = [ - "async-channel", + "async-channel 1.9.0", "async-executor", "async-io 1.13.0", "async-lock 2.8.0", @@ -305,7 +318,7 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" dependencies = [ - "async-channel", + "async-channel 1.9.0", "async-global-executor", "async-io 1.13.0", "async-lock 2.8.0", @@ -408,7 +421,7 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c36a4d0d48574b3dd360b4b7d95cc651d2b6557b6402848a27d4b228a473e2a" dependencies = [ - "async-channel", + "async-channel 1.9.0", "async-lock 2.8.0", "async-task", "fastrand 2.0.1", @@ -1711,6 +1724,7 @@ version = "0.1.1" dependencies = [ "anyhow", "ashpd", + "async-channel 2.1.0", "capnp 0.18.3", "capnp-rpc", "chrono", diff --git a/Cargo.toml b/Cargo.toml index b28a55d..c557772 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,4 +31,5 @@ rand = "0.8.5" ureq = "2.7.1" futures = "0.3.0" ashpd = "0.6.0" +async-channel = "2.1.0" relm4-macros = { version = "0.6.2", features = [], default-features = false } diff --git a/ntfy-daemon/Cargo.toml b/ntfy-daemon/Cargo.toml index f934069..7d0be03 100644 --- a/ntfy-daemon/Cargo.toml +++ b/ntfy-daemon/Cargo.toml @@ -24,7 +24,6 @@ rusqlite = "0.29.0" rand = "0.8.5" reqwest = { version = "0.11.18", features = ["stream", "rustls-tls"]} url = "2.4.0" -ashpd = "0.6.0" generational-arena = "0.2.9" tracing = "0.1.37" thiserror = "1.0.49" diff --git a/ntfy-daemon/src/lib.rs b/ntfy-daemon/src/lib.rs index cd0d4a5..f85d8ca 100644 --- a/ntfy-daemon/src/lib.rs +++ b/ntfy-daemon/src/lib.rs @@ -14,7 +14,7 @@ pub struct SharedEnv { db: message_repo::Db, proxy: Arc, http: reqwest::Client, - network: Arc>, + network: Arc, } #[derive(thiserror::Error, Debug)] diff --git a/ntfy-daemon/src/models.rs b/ntfy-daemon/src/models.rs index c98b7e0..ba8778d 100644 --- a/ntfy-daemon/src/models.rs +++ b/ntfy-daemon/src/models.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; +use std::pin::Pin; use std::sync::OnceLock; +use futures::stream::Stream; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -316,3 +318,7 @@ pub struct Notification { pub trait NotificationProxy: Sync + Send { fn send(&self, n: Notification) -> anyhow::Result<()>; } + +pub trait NetworkMonitorProxy: Sync + Send { + fn listen(&self) -> Pin>>; +} diff --git a/ntfy-daemon/src/system_client.rs b/ntfy-daemon/src/system_client.rs index 587be84..92430f6 100644 --- a/ntfy-daemon/src/system_client.rs +++ b/ntfy-daemon/src/system_client.rs @@ -343,7 +343,7 @@ impl SystemNotifier { pub fn new( dbpath: &str, notification_proxy: Arc, - network: Arc>, + network: Arc, ) -> Self { Self { watching: Rc::new(RefCell::new(HashMap::new())), @@ -457,12 +457,12 @@ pub fn start( socket_path: std::path::PathBuf, dbpath: &str, notification_proxy: Arc, + network_proxy: Arc, ) -> anyhow::Result<()> { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - let network_monitor = rt.block_on(async move { NetworkMonitor::new().await.unwrap() }); let listener = rt.block_on(async move { let _ = std::fs::remove_file(&socket_path); UnixListener::bind(&socket_path).unwrap() @@ -471,8 +471,7 @@ pub fn start( let dbpath = dbpath.to_owned(); let f = move || { let local = tokio::task::LocalSet::new(); - let mut system_notifier = - SystemNotifier::new(&dbpath, notification_proxy, Arc::new(network_monitor)); + let mut system_notifier = SystemNotifier::new(&dbpath, notification_proxy, network_proxy); local.spawn_local(async move { system_notifier.watch_subscribed().await.unwrap(); let system_client: system_notifier::Client = capnp_rpc::new_client(system_notifier); diff --git a/ntfy-daemon/src/topic_listener.rs b/ntfy-daemon/src/topic_listener.rs index d2c955f..39884a7 100644 --- a/ntfy-daemon/src/topic_listener.rs +++ b/ntfy-daemon/src/topic_listener.rs @@ -141,21 +141,14 @@ impl TopicListener { } async fn reload_on_network_change( - monitor: Arc>, + monitor: Arc, tx: mpsc::Sender>, ) -> anyhow::Result<()> { - let mut prev_available = false; - - loop { - let _ = monitor.receive_changed().await?; - let available = monitor.is_available().await?; - if available && !prev_available { - if let Err(e) = tx.send(ControlFlow::Continue(())).await { - return Err(e.into()); - } - } - prev_available = available; + let mut m = monitor.listen(); + while let Some(_) = m.next().await { + tx.send(ControlFlow::Continue(())).await?; } + Ok(()) } fn send_current_status(&mut self) -> impl Future> { diff --git a/src/application.rs b/src/application.rs index bc7132d..ad87310 100644 --- a/src/application.rs +++ b/src/application.rs @@ -1,8 +1,12 @@ +use std::cell::Cell; use std::path::Path; use std::path::PathBuf; +use std::pin::Pin; +use std::rc::Rc; use adw::subclass::prelude::*; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; +use futures::stream::Stream; use futures::AsyncReadExt; use gettextrs::gettext; use gio::SocketClient; @@ -242,6 +246,11 @@ impl NotifyApplication { let dbpath = glib::user_data_dir().join("com.ranfdev.Notify.sqlite"); info!(database_path = %dbpath.display()); + // Here I'm sending notifications to the desktop environment and listening for network changes. + // This should have been inside ntfy-daemon, but using portals from another thread causes the error + // `Invalid client serial` and it's broken. + // Until https://github.com/flatpak/xdg-dbus-proxy/issues/46 is solved, I have to handle these things + // in the main thread. Uff. let (tx, rx) = glib::MainContext::channel(Default::default()); let app = self.clone(); rx.attach(None, move |n: models::Notification| { @@ -268,17 +277,39 @@ impl NotifyApplication { glib::ControlFlow::Continue }); - struct Proxy(glib::Sender); - impl models::NotificationProxy for Proxy { + struct Proxies { + notification: glib::Sender, + } + impl models::NotificationProxy for Proxies { fn send(&self, n: models::Notification) -> anyhow::Result<()> { - self.0.send(n)?; + self.notification.send(n)?; Ok(()) } } + impl models::NetworkMonitorProxy for Proxies { + fn listen(&self) -> Pin>> { + let (tx, rx) = async_channel::bounded(1); + let mut prev_available = Rc::new(Cell::new(false)); + + gio::NetworkMonitor::default().connect_network_changed(move |_, available| { + dbg!("sent", available); + if available && !prev_available.get() { + if let Err(e) = tx.send_blocking(()) { + warn!(error = %e); + } + } + prev_available.replace(available); + }); + + Box::pin(rx) + } + } + let proxies = std::sync::Arc::new(Proxies { notification: tx }); ntfy_daemon::system_client::start( socket_path.to_owned(), dbpath.to_str().unwrap(), - std::sync::Arc::new(Proxy(tx)), + proxies.clone(), + proxies, ) .unwrap(); self.imp().hold_guard.set(self.hold()).unwrap();