Skip to content

Commit

Permalink
cache credentials in memory, refresh subscriptions on new account
Browse files Browse the repository at this point in the history
  • Loading branch information
ranfdev committed Nov 15, 2023
1 parent f13146e commit bbca7a4
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 65 deletions.
103 changes: 103 additions & 0 deletions ntfy-daemon/src/credentials.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::rc::Rc;

#[derive(Debug, Clone)]
pub struct Credential {
pub username: String,
pub password: String,
}

#[derive(Debug, Clone)]
pub struct Credentials {
keyring: Rc<oo7::Keyring>,
creds: Rc<RefCell<HashMap<String, Credential>>>,
}

impl Credentials {
pub async fn new() -> anyhow::Result<Self> {
let mut this = Self {
keyring: Rc::new(
oo7::Keyring::new()
.await
.expect("Failed to start Secret Service"),
),
creds: Default::default(),
};
this.load().await?;
Ok(this)
}
pub async fn load(&mut self) -> anyhow::Result<()> {
let attrs = HashMap::from([("type", "password")]);
let values = self
.keyring
.search_items(attrs)
.await
.map_err(|e| capnp::Error::failed(e.to_string()))?;

self.creds.borrow_mut().clear();
for item in values {
let attrs = item
.attributes()
.await
.map_err(|e| capnp::Error::failed(e.to_string()))?;
self.creds.borrow_mut().insert(
attrs["server"].to_string(),
Credential {
username: attrs["username"].to_string(),
password: std::str::from_utf8(&item.secret().await?)?.to_string(),
},
);
}
Ok(())
}
pub fn get(&self, server: &str) -> Option<Credential> {
self.creds.borrow().get(server).cloned()
}
pub fn list_all(&self) -> HashMap<String, Credential> {
self.creds.borrow().clone()
}
pub async fn insert(&self, server: &str, username: &str, password: &str) -> anyhow::Result<()> {
let attrs = HashMap::from([
("type", "password"),
("username", username),
("server", server),
]);
self.keyring
.create_item("Password", attrs, password, true)
.await
.map_err(|e| capnp::Error::failed(e.to_string()))?;

self.creds.borrow_mut().insert(
server.to_string(),
Credential {
username: username.to_string(),
password: password.to_string(),
},
);
Ok(())
}
pub async fn delete(&self, server: &str) -> anyhow::Result<()> {
let creds = {
self.creds
.borrow()
.get(server)
.ok_or(anyhow::anyhow!("server creds not found"))?
.clone()
};
let attrs = HashMap::from([
("type", "password"),
("username", &creds.username),
("server", server),
]);
self.keyring
.delete(attrs)
.await
.map_err(|e| capnp::Error::failed(e.to_string()))?;
self.creds
.borrow_mut()
.remove(server)
.ok_or(anyhow::anyhow!("server creds not found"))?;
Ok(())
}
}
3 changes: 2 additions & 1 deletion ntfy-daemon/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod credentials;
pub mod message_repo;
pub mod models;
pub mod retry;
Expand All @@ -16,7 +17,7 @@ pub struct SharedEnv {
proxy: Arc<dyn models::NotificationProxy>,
http: reqwest::Client,
network: Arc<dyn models::NetworkMonitorProxy>,
keyring: Rc<oo7::Keyring>,
credentials: credentials::Credentials,
}

#[derive(thiserror::Error, Debug)]
Expand Down
1 change: 1 addition & 0 deletions ntfy-daemon/src/ntfy.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ interface Subscription {
updateReadUntil @4 (value: UInt64);

clearNotifications @5 ();
refresh @6 ();
}

struct Account {
Expand Down
86 changes: 45 additions & 41 deletions ntfy-daemon/src/system_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,20 @@ impl subscription::Server for SubscriptionImpl {
model.read_until = value;
Promise::ok(())
}
fn refresh(
&mut self,
_: subscription::RefreshParams,
_: subscription::RefreshResults,
) -> capnp::capability::Promise<(), capnp::Error> {
let sender = self.topic_listener.clone();
Promise::from_future(async move {
sender
.send(ControlFlow::Continue(()))
.await
.map_err(|e| capnp::Error::failed(format!("{:?}", e)))?;
Ok(())
})
}
}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
Expand All @@ -343,7 +357,7 @@ impl SystemNotifier {
dbpath: &str,
notification_proxy: Arc<dyn models::NotificationProxy>,
network: Arc<dyn models::NetworkMonitorProxy>,
keyring: oo7::Keyring,
credentials: crate::credentials::Credentials,
) -> Self {
Self {
watching: Rc::new(RefCell::new(HashMap::new())),
Expand All @@ -352,7 +366,7 @@ impl SystemNotifier {
proxy: notification_proxy,
http: build_client().unwrap(),
network,
keyring: Rc::new(keyring),
credentials,
},
}
}
Expand Down Expand Up @@ -388,6 +402,18 @@ impl SystemNotifier {
Ok(())
})
}
pub fn refresh_all(&mut self) -> Promise<(), capnp::Error> {
let watching = self.watching.clone();
Promise::from_future(async move {
let reqs: Vec<_> = watching
.borrow()
.values()
.map(|w| w.refresh_request())
.collect();
join_all(reqs.into_iter().map(|x| x.send().promise)).await;
Ok(())
})
}
}

impl system_notifier::Server for SystemNotifier {
Expand Down Expand Up @@ -457,35 +483,26 @@ impl system_notifier::Server for SystemNotifier {
_: system_notifier::ListAccountsParams,
mut results: system_notifier::ListAccountsResults,
) -> capnp::capability::Promise<(), capnp::Error> {
let keyring = self.env.keyring.clone();
let values = self.env.credentials.list_all();

Promise::from_future(async move {
let attrs = HashMap::from([("type", "password")]);
let values = keyring
.search_items(attrs)
.await
.map_err(|e| capnp::Error::failed(e.to_string()))?;

let mut list = results.get().init_list(values.len() as u32);
for (i, item) in values.iter().enumerate() {
let attrs = item
.attributes()
.await
.map_err(|e| capnp::Error::failed(e.to_string()))?;
for (i, item) in values.into_iter().enumerate() {
let mut acc = list.reborrow().get(i as u32);
acc.set_username(attrs["username"][..].into());
acc.set_server(attrs["server"][..].into());
acc.set_server(item.0[..].into());
acc.set_username(item.1.username[..].into());
}
Ok(())
})
}
fn add_account(
&mut self,
params: system_notifier::AddAccountParams,
mut results: system_notifier::AddAccountResults,
_: system_notifier::AddAccountResults,
) -> capnp::capability::Promise<(), capnp::Error> {
let keyring = self.env.keyring.clone();
let credentials = self.env.credentials.clone();
let http = self.env.http.clone();
let refresh = self.refresh_all();
Promise::from_future(async move {
let account = params.get()?.get_account()?;
let username = account.get_username()?.to_str()?;
Expand All @@ -503,15 +520,11 @@ impl system_notifier::Server for SystemNotifier {
.error_for_status()
.map_err(|e| capnp::Error::failed(e.to_string()))?;

let attrs = HashMap::from([
("type", "password"),
("username", username),
("server", server),
]);
keyring
.create_item("Password", attrs, password, true)
credentials
.insert(server, username, password)
.await
.map_err(|e| capnp::Error::failed(e.to_string()))?;
refresh.await?;

info!(server = %server, username = %username, "added account");

Expand All @@ -521,21 +534,16 @@ impl system_notifier::Server for SystemNotifier {
fn remove_account(
&mut self,
params: system_notifier::RemoveAccountParams,
mut results: system_notifier::RemoveAccountResults,
_: system_notifier::RemoveAccountResults,
) -> capnp::capability::Promise<(), capnp::Error> {
let keyring = self.env.keyring.clone();
let credentials = self.env.credentials.clone();
Promise::from_future(async move {
let account = params.get()?.get_account()?;
let username = account.get_username()?.to_str()?;
let server = account.get_server()?.to_str()?;

let attrs = HashMap::from([
("type", "password"),
("username", username),
("server", server),
]);
keyring
.delete(attrs)
credentials
.delete(server)
.await
.map_err(|e| capnp::Error::failed(e.to_string()))?;

Expand All @@ -561,17 +569,13 @@ pub fn start(
UnixListener::bind(&socket_path).unwrap()
});

let keyring = rt.block_on(async {
oo7::Keyring::new()
.await
.expect("Failed to start Secret Service")
});

let dbpath = dbpath.to_owned();
let f = move || {
let credentials =
rt.block_on(async { crate::credentials::Credentials::new().await.unwrap() });
let local = tokio::task::LocalSet::new();
let mut system_notifier =
SystemNotifier::new(&dbpath, notification_proxy, network_proxy, keyring);
SystemNotifier::new(&dbpath, notification_proxy, network_proxy, credentials);
local.spawn_local(async move {
system_notifier.watch_subscribed().await.unwrap();
let system_client: system_notifier::Client = capnp_rpc::new_client(system_notifier);
Expand Down
26 changes: 3 additions & 23 deletions ntfy-daemon/src/topic_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,34 +170,14 @@ impl TopicListener {

#[instrument(skip_all)]
async fn recv_and_forward(&mut self) -> anyhow::Result<()> {
let (username, password) = {
let attrs = HashMap::from([("type", "password"), ("server", &self.endpoint)]);
let items = self
.env
.keyring
.search_items(attrs)
.await
.map_err(|e| capnp::Error::failed(e.to_string()))?;

if let Some(item) = items.into_iter().next() {
let attrs = item
.attributes()
.await
.map_err(|e| capnp::Error::failed(e.to_string()))?;
let password = item.secret().await?;
let password = std::str::from_utf8(&*password)?;
(attrs.get("username").cloned(), Some(password.to_string()))
} else {
(None, None)
}
};
let creds = self.env.credentials.get(&self.endpoint);
let req = topic_request(
&self.env.http,
&self.endpoint,
&self.topic,
self.since,
username.as_deref(),
password.as_deref(),
creds.as_ref().map(|x| x.username.as_str()),
creds.as_ref().map(|x| x.password.as_str()),
);
let res = self.env.http.execute(req?).await?;
let reader = tokio_util::io::StreamReader::new(
Expand Down

0 comments on commit bbca7a4

Please sign in to comment.