Skip to content

Commit

Permalink
Use turso remote replica
Browse files Browse the repository at this point in the history
  • Loading branch information
kasuboski committed Jun 1, 2024
1 parent 83ffa4a commit 00fa362
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/target
*.db
*.db*
result
.direnv
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
inputsFrom = [bin];
packages = with pkgs; [
just
turso-cli

# github actions
act
Expand Down
109 changes: 73 additions & 36 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::env;
use std::sync::Arc;
use std::path::Path;

use anyhow::{Context, Result};
use chrono::Utc;
Expand All @@ -15,14 +15,40 @@ pub struct DB {
db: Arc<libsql::Database>,
}

pub enum ConnectionBacking<'a> {
pub enum ConnectionBacking {
#[allow(dead_code)] // used in tests...
Memory,
File(&'a dyn AsRef<Path>),
File(String),
Remote(TursoCreds),
RemoteReplica(TursoCreds, String),
}

pub async fn connect(conn_back: ConnectionBacking<'_>) -> Result<DB> {
pub struct TursoCreds {
pub url: String,
pub token: String,
}

impl TursoCreds {
pub fn from_env() -> Option<Self> {
Some(Self {
url: env::var("TURSO_URL").ok()?,
token: env::var("TURSO_TOKEN").ok()?,
})
}
}

pub async fn connect(conn_back: ConnectionBacking) -> Result<DB> {
let db = match conn_back {
ConnectionBacking::Remote(creds) => {
libsql::Builder::new_remote(creds.url, creds.token)
.build()
.await?
}
ConnectionBacking::RemoteReplica(creds, p) => {
libsql::Builder::new_remote_replica(p, creds.url, creds.token)
.build()
.await?
}
ConnectionBacking::File(p) => libsql::Builder::new_local(p).build().await?,
ConnectionBacking::Memory => libsql::Builder::new_local(":memory:").build().await?,
};
Expand Down Expand Up @@ -66,8 +92,9 @@ impl From<String> for EntryFilter {

impl DB {
pub(crate) async fn init(&self) -> Result<()> {
self.conn.execute_batch(
r#"
self.conn
.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS feeds
(
id TEXT PRIMARY KEY NOT NULL,
Expand All @@ -92,9 +119,9 @@ CREATE TABLE IF NOT EXISTS entries
feed TEXT
);
"#,
)
.await
.context("couldn't init db")
)
.await
.context("couldn't init db")
}

pub(crate) async fn add_feeds<T>(&self, feeds: T) -> Result<()>
Expand All @@ -114,20 +141,22 @@ CREATE TABLE IF NOT EXISTS entries
.context("couldn't prepare statement")?;

for f in feeds {
let _ = stmt.execute((
f.id,
f.name,
f.site_url,
f.feed_url,
f.last_fetched,
f.fetch_error,
f.category
)).await?;
let _ = stmt
.execute((
f.id,
f.name,
f.site_url,
f.feed_url,
f.last_fetched,
f.fetch_error,
f.category,
))
.await?;
stmt.reset();
}
}
tx.commit().await?;

Ok(())
}

Expand All @@ -154,10 +183,13 @@ CREATE TABLE IF NOT EXISTS entries
}

pub(crate) async fn update_feed_status(&self, id: String, error: Option<String>) -> Result<()> {
let mut stmt = self.conn.prepare(
"UPDATE feeds SET fetch_error = ?, last_fetched = ?
let mut stmt = self
.conn
.prepare(
"UPDATE feeds SET fetch_error = ?, last_fetched = ?
WHERE id = ?",
).await?;
)
.await?;

stmt.execute((error, UtcTime(Utc::now()), id)).await?;

Expand All @@ -175,17 +207,19 @@ CREATE TABLE IF NOT EXISTS entries
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
).await?;
for e in entries {
let _ = stmt.execute((
e.id,
e.title,
e.content_link,
e.comments_link,
e.robust_link,
e.published,
e.read,
e.starred,
e.feed
)).await?;
let _ = stmt
.execute((
e.id,
e.title,
e.content_link,
e.comments_link,
e.robust_link,
e.published,
e.read,
e.starred,
e.feed,
))
.await?;
stmt.reset();
}
}
Expand All @@ -210,7 +244,8 @@ CREATE TABLE IF NOT EXISTS entries
EntryFilter::All => "",
};
let statement_string = format!("SELECT id, title, content_link, comments_link, robust_link, published, read, starred, feed FROM entries {} {}", where_clause, order_clause);
let mut stmt = self.conn
let mut stmt = self
.conn
.prepare(&statement_string)
.await
.context("couldn't prepare statement")?;
Expand Down Expand Up @@ -241,7 +276,8 @@ CREATE TABLE IF NOT EXISTS entries
ordering: Ordering,
) -> Result<Vec<Entry>> {
{
let mut stmt = self.conn
let mut stmt = self
.conn
.prepare("UPDATE entries SET read = NOT read WHERE id = ?")
.await
.context("couldn't prepare statement")?;
Expand All @@ -257,7 +293,8 @@ CREATE TABLE IF NOT EXISTS entries
ordering: Ordering,
) -> Result<Vec<Entry>> {
{
let mut stmt = self.conn
let mut stmt = self
.conn
.prepare("UPDATE entries SET starred = NOT starred WHERE id = ?")
.await
.context("couldn't prepare statement")?;
Expand Down
22 changes: 15 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use anyhow::anyhow;

use askama_warp::Template;
use chrono::{DateTime, Utc};
use db::TursoCreds;
use feed_rs::parser;
use opml::OPML;
use rweb::*;
Expand Down Expand Up @@ -242,7 +243,7 @@ fn reject_anyhow(err: anyhow::Error) -> Rejection {
}

#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
if env::var_os("RUST_LOG").is_none() {
env::set_var("RUST_LOG", "feedreader=info");
}
Expand All @@ -262,13 +263,19 @@ async fn main() {
"Access-Control-Request-Headers",
])
.allow_methods(vec!["GET", "HEAD", "POST", "DELETE"]);
let db = if let Some(creds) = TursoCreds::from_env() {
if let Ok(db_path) = env::var("FEED_DB_PATH") {
db::connect(db::ConnectionBacking::RemoteReplica(creds, db_path))
} else {
db::connect(db::ConnectionBacking::Remote(creds))
}
} else if let Ok(db_path) = env::var("FEED_DB_PATH") {
db::connect(db::ConnectionBacking::File(db_path))
} else {
anyhow::bail!("You must specify one of turso creds or db filepath")
};

let db_path = env::var("FEED_DB_PATH")
.or::<Result<String, env::VarError>>(Ok("./feeds.db".to_string()))
.expect("couldn't set db path");
let db: db::DB = db::connect(db::ConnectionBacking::File(&db_path))
.await
.expect("couldn't open db");
let db = db.await.expect("couldn't open db");
db.init().await.expect("couldn't init db");

if let Ok(f) = env::var("FEED_OPML_FILE") {
Expand Down Expand Up @@ -422,6 +429,7 @@ async fn main() {
Box::pin(warp::serve(routes).run(([0, 0, 0, 0], 3030))),
)
.await;
Ok(())
}

#[get("/")]
Expand Down

0 comments on commit 00fa362

Please sign in to comment.