Skip to content

Commit

Permalink
Reduce cpu usage and fix network monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
ranfdev committed Nov 7, 2023
1 parent ae2084c commit 8a25dce
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 25 deletions.
20 changes: 17 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ rand = "0.8.5"
ureq = "2.7.1"
futures = "0.3.0"
ashpd = "0.6.0"
async-channel = "2.1.0"
relm4-macros = { version = "0.6.2", features = [], default-features = false }
1 change: 0 additions & 1 deletion ntfy-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ rusqlite = "0.29.0"
rand = "0.8.5"
reqwest = { version = "0.11.18", features = ["stream", "rustls-tls"]}
url = "2.4.0"
ashpd = "0.6.0"
generational-arena = "0.2.9"
tracing = "0.1.37"
thiserror = "1.0.49"
Expand Down
2 changes: 1 addition & 1 deletion ntfy-daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct SharedEnv {
db: message_repo::Db,
proxy: Arc<dyn models::NotificationProxy>,
http: reqwest::Client,
network: Arc<ashpd::desktop::network_monitor::NetworkMonitor<'static>>,
network: Arc<dyn models::NetworkMonitorProxy>,
}

#[derive(thiserror::Error, Debug)]
Expand Down
6 changes: 6 additions & 0 deletions ntfy-daemon/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::OnceLock;

use futures::stream::Stream;
use regex::Regex;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -316,3 +318,7 @@ pub struct Notification {
pub trait NotificationProxy: Sync + Send {
fn send(&self, n: Notification) -> anyhow::Result<()>;
}

pub trait NetworkMonitorProxy: Sync + Send {
fn listen(&self) -> Pin<Box<dyn Stream<Item = ()>>>;
}
7 changes: 3 additions & 4 deletions ntfy-daemon/src/system_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ impl SystemNotifier {
pub fn new(
dbpath: &str,
notification_proxy: Arc<dyn models::NotificationProxy>,
network: Arc<NetworkMonitor<'static>>,
network: Arc<dyn models::NetworkMonitorProxy>,
) -> Self {
Self {
watching: Rc::new(RefCell::new(HashMap::new())),
Expand Down Expand Up @@ -457,12 +457,12 @@ pub fn start(
socket_path: std::path::PathBuf,
dbpath: &str,
notification_proxy: Arc<dyn models::NotificationProxy>,
network_proxy: Arc<dyn models::NetworkMonitorProxy>,
) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

let network_monitor = rt.block_on(async move { NetworkMonitor::new().await.unwrap() });
let listener = rt.block_on(async move {
let _ = std::fs::remove_file(&socket_path);
UnixListener::bind(&socket_path).unwrap()
Expand All @@ -471,8 +471,7 @@ pub fn start(
let dbpath = dbpath.to_owned();
let f = move || {
let local = tokio::task::LocalSet::new();
let mut system_notifier =
SystemNotifier::new(&dbpath, notification_proxy, Arc::new(network_monitor));
let mut system_notifier = SystemNotifier::new(&dbpath, notification_proxy, network_proxy);
local.spawn_local(async move {
system_notifier.watch_subscribed().await.unwrap();
let system_client: system_notifier::Client = capnp_rpc::new_client(system_notifier);
Expand Down
17 changes: 5 additions & 12 deletions ntfy-daemon/src/topic_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,14 @@ impl TopicListener {
}

async fn reload_on_network_change(
monitor: Arc<NetworkMonitor<'static>>,
monitor: Arc<dyn models::NetworkMonitorProxy>,
tx: mpsc::Sender<ControlFlow<()>>,
) -> anyhow::Result<()> {
let mut prev_available = false;

loop {
let _ = monitor.receive_changed().await?;
let available = monitor.is_available().await?;
if available && !prev_available {
if let Err(e) = tx.send(ControlFlow::Continue(())).await {
return Err(e.into());
}
}
prev_available = available;
let mut m = monitor.listen();
while let Some(_) = m.next().await {
tx.send(ControlFlow::Continue(())).await?;
}
Ok(())
}

fn send_current_status(&mut self) -> impl Future<Output = anyhow::Result<()>> {
Expand Down
39 changes: 35 additions & 4 deletions src/application.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::cell::Cell;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;

use adw::subclass::prelude::*;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::stream::Stream;
use futures::AsyncReadExt;
use gettextrs::gettext;
use gio::SocketClient;
Expand Down Expand Up @@ -242,6 +246,11 @@ impl NotifyApplication {
let dbpath = glib::user_data_dir().join("com.ranfdev.Notify.sqlite");
info!(database_path = %dbpath.display());

// Here I'm sending notifications to the desktop environment and listening for network changes.
// This should have been inside ntfy-daemon, but using portals from another thread causes the error
// `Invalid client serial` and it's broken.
// Until https://github.com/flatpak/xdg-dbus-proxy/issues/46 is solved, I have to handle these things
// in the main thread. Uff.
let (tx, rx) = glib::MainContext::channel(Default::default());
let app = self.clone();
rx.attach(None, move |n: models::Notification| {
Expand All @@ -268,17 +277,39 @@ impl NotifyApplication {
glib::ControlFlow::Continue
});

struct Proxy(glib::Sender<models::Notification>);
impl models::NotificationProxy for Proxy {
struct Proxies {
notification: glib::Sender<models::Notification>,
}
impl models::NotificationProxy for Proxies {
fn send(&self, n: models::Notification) -> anyhow::Result<()> {
self.0.send(n)?;
self.notification.send(n)?;
Ok(())
}
}
impl models::NetworkMonitorProxy for Proxies {
fn listen(&self) -> Pin<Box<dyn Stream<Item = ()>>> {
let (tx, rx) = async_channel::bounded(1);
let mut prev_available = Rc::new(Cell::new(false));

gio::NetworkMonitor::default().connect_network_changed(move |_, available| {
dbg!("sent", available);
if available && !prev_available.get() {
if let Err(e) = tx.send_blocking(()) {
warn!(error = %e);
}
}
prev_available.replace(available);
});

Box::pin(rx)
}
}
let proxies = std::sync::Arc::new(Proxies { notification: tx });
ntfy_daemon::system_client::start(
socket_path.to_owned(),
dbpath.to_str().unwrap(),
std::sync::Arc::new(Proxy(tx)),
proxies.clone(),
proxies,
)
.unwrap();
self.imp().hold_guard.set(self.hold()).unwrap();
Expand Down

0 comments on commit 8a25dce

Please sign in to comment.