Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ext/kv): send queue wake messages accross different kv instances #20465

Merged
merged 8 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions cli/tests/unit/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,40 @@ Deno.test({
},
});

Deno.test({
name: "different kv instances for enqueue and queueListen",
async fn() {
const filename = await Deno.makeTempFile({ prefix: "queue_db" });
try {
const db0 = await Deno.openKv(filename);
const db1 = await Deno.openKv(filename);
const promise = deferred();
let dequeuedMessage: unknown = null;
const listener = db0.listenQueue((msg) => {
dequeuedMessage = msg;
promise.resolve();
});
try {
const res = await db1.enqueue("test");
assert(res.ok);
assertNotEquals(res.versionstamp, null);
await promise;
assertEquals(dequeuedMessage, "test");
} finally {
db0.close();
await listener;
db1.close();
}
} finally {
try {
await Deno.remove(filename);
} catch {
// pass
}
}
},
});

Deno.test({
name: "queue graceful close",
async fn() {
Expand Down
1 change: 1 addition & 0 deletions ext/kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async-trait.workspace = true
base64.workspace = true
chrono.workspace = true
deno_core.workspace = true
deno_node.workspace = true
deno_unsync = "0.1.1"
hex.workspace = true
log.workspace = true
Expand Down
163 changes: 129 additions & 34 deletions ext/kv/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::env::current_dir;
use std::future::Future;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
Expand All @@ -23,11 +26,14 @@ use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
use deno_core::AsyncRefCell;
use deno_core::OpState;
use deno_node::PathClean;
use rand::Rng;
use rusqlite::params;
use rusqlite::OpenFlags;
use rusqlite::OptionalExtension;
use rusqlite::Transaction;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::sync::OnceCell;
Expand Down Expand Up @@ -212,30 +218,35 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
}
}

let conn = sqlite_retry_loop(|| {
let (conn, queue_waker_key) = sqlite_retry_loop(|| {
let path = path.clone();
let default_storage_dir = self.default_storage_dir.clone();
async move {
spawn_blocking(move || {
let conn = match (path.as_deref(), &default_storage_dir) {
(Some(":memory:"), _) | (None, None) => {
rusqlite::Connection::open_in_memory()?
}
(Some(path), _) => {
let flags =
OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
rusqlite::Connection::open_with_flags(path, flags)?
}
(None, Some(path)) => {
std::fs::create_dir_all(path)?;
let path = path.join("kv.sqlite3");
rusqlite::Connection::open(path)?
}
};
let (conn, queue_waker_key) =
match (path.as_deref(), &default_storage_dir) {
(Some(":memory:"), _) | (None, None) => {
(rusqlite::Connection::open_in_memory()?, None)
}
(Some(path), _) => {
let flags =
OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
let resolved_path = canonicalize_path(&PathBuf::from(path))?;
(
rusqlite::Connection::open_with_flags(path, flags)?,
Some(resolved_path),
)
}
(None, Some(path)) => {
std::fs::create_dir_all(path)?;
let path = path.join("kv.sqlite3");
(rusqlite::Connection::open(path.clone())?, Some(path))
}
};

conn.pragma_update(None, "journal_mode", "wal")?;

Ok::<_, AnyError>(conn)
Ok::<_, AnyError>((conn, queue_waker_key))
})
.await
.unwrap()
Expand Down Expand Up @@ -277,6 +288,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
Ok(SqliteDb {
conn,
queue: OnceCell::new(),
queue_waker_key,
expiration_watcher,
})
}
Expand All @@ -285,6 +297,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
pub struct SqliteDb {
conn: ProtectedConn,
queue: OnceCell<SqliteQueue>,
queue_waker_key: Option<PathBuf>,
expiration_watcher: deno_core::unsync::JoinHandle<()>,
}

Expand Down Expand Up @@ -363,7 +376,7 @@ pub struct DequeuedMessage {
conn: WeakProtectedConn,
id: String,
payload: Option<Vec<u8>>,
waker_tx: mpsc::Sender<()>,
waker_tx: broadcast::Sender<()>,
_permit: OwnedSemaphorePermit,
}

Expand Down Expand Up @@ -403,7 +416,7 @@ impl QueueMessageHandle for DequeuedMessage {
};
if requeued {
// If the message was requeued, wake up the dequeue loop.
self.waker_tx.send(()).await?;
let _ = self.waker_tx.send(());
}
Ok(())
}
Expand All @@ -422,15 +435,18 @@ struct SqliteQueue {
conn: ProtectedConn,
dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>,
concurrency_limiter: Arc<Semaphore>,
waker_tx: mpsc::Sender<()>,
waker_tx: broadcast::Sender<()>,
shutdown_tx: watch::Sender<()>,
}

impl SqliteQueue {
fn new(conn: ProtectedConn) -> Self {
fn new(
conn: ProtectedConn,
waker_tx: broadcast::Sender<()>,
waker_rx: broadcast::Receiver<()>,
) -> Self {
let conn_clone = conn.clone();
let (shutdown_tx, shutdown_rx) = watch::channel::<()>(());
let (waker_tx, waker_rx) = mpsc::channel::<()>(1);
let (dequeue_tx, dequeue_rx) = mpsc::channel::<(Vec<u8>, String)>(64);

spawn(async move {
Expand Down Expand Up @@ -486,11 +502,6 @@ impl SqliteQueue {
}))
}

async fn wake(&self) -> Result<(), AnyError> {
self.waker_tx.send(()).await?;
Ok(())
}

fn shutdown(&self) {
let _ = self.shutdown_tx.send(());
}
Expand All @@ -499,7 +510,7 @@ impl SqliteQueue {
conn: ProtectedConn,
dequeue_tx: mpsc::Sender<(Vec<u8>, String)>,
mut shutdown_rx: watch::Receiver<()>,
mut waker_rx: mpsc::Receiver<()>,
mut waker_rx: broadcast::Receiver<()>,
) -> Result<(), AnyError> {
loop {
let messages = SqliteDb::run_tx(conn.clone(), move |tx| {
Expand Down Expand Up @@ -575,7 +586,9 @@ impl SqliteQueue {
};
tokio::select! {
_ = sleep_fut => {}
x = waker_rx.recv() => if x.is_none() {return Ok(());},
x = waker_rx.recv() => {
if let Err(RecvError::Closed) = x {return Ok(());}
},
_ = shutdown_rx.changed() => return Ok(())
}
}
Expand Down Expand Up @@ -773,7 +786,7 @@ impl Database for SqliteDb {

async fn atomic_write(
&self,
_state: Rc<RefCell<OpState>>,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
let write = Arc::new(write);
Expand Down Expand Up @@ -892,20 +905,39 @@ impl Database for SqliteDb {
.await?;

if has_enqueues {
if let Some(queue) = self.queue.get() {
queue.wake().await?;
match self.queue.get() {
Some(queue) => {
let _ = queue.waker_tx.send(());
}
None => {
if let Some(waker_key) = &self.queue_waker_key {
let (waker_tx, _) =
shared_queue_waker_channel(waker_key, state.clone());
let _ = waker_tx.send(());
}
}
}
}
Ok(commit_result)
}

async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
state: Rc<RefCell<OpState>>,
) -> Result<Option<Self::QMH>, AnyError> {
let queue = self
.queue
.get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) })
.get_or_init(|| async move {
let (waker_tx, waker_rx) = {
match &self.queue_waker_key {
Some(waker_key) => {
shared_queue_waker_channel(waker_key, state.clone())
}
None => broadcast::channel(1),
}
};
SqliteQueue::new(self.conn.clone(), waker_tx, waker_rx)
})
.await;
let handle = queue.dequeue().await?;
Ok(handle)
Expand Down Expand Up @@ -1012,6 +1044,69 @@ fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
}
}

pub struct QueueWaker {
wakers_tx: HashMap<PathBuf, broadcast::Sender<()>>,
}

fn shared_queue_waker_channel(
waker_key: &Path,
state: Rc<RefCell<OpState>>,
) -> (broadcast::Sender<()>, broadcast::Receiver<()>) {
let mut state = state.borrow_mut();
let waker = {
let waker = state.try_borrow_mut::<QueueWaker>();
match waker {
Some(waker) => waker,
None => {
let waker = QueueWaker {
wakers_tx: HashMap::new(),
};
state.put::<QueueWaker>(waker);
state.borrow_mut::<QueueWaker>()
}
}
};

let waker_tx = waker
.wakers_tx
.entry(waker_key.to_path_buf())
.or_insert_with(|| {
let (waker_tx, _) = broadcast::channel(1);
waker_tx
});

(waker_tx.clone(), waker_tx.subscribe())
}

/// Same as Path::canonicalize, but also handles non-existing paths.
fn canonicalize_path(path: &Path) -> Result<PathBuf, AnyError> {
let path = path.to_path_buf().clean();
let mut path = path;
let mut names_stack = Vec::new();
loop {
match path.canonicalize() {
Ok(mut canonicalized_path) => {
for name in names_stack.into_iter().rev() {
canonicalized_path = canonicalized_path.join(name);
}
return Ok(canonicalized_path);
}
Err(err) if err.kind() == ErrorKind::NotFound => {
let file_name = path.file_name().map(|os_str| os_str.to_os_string());
if let Some(file_name) = file_name {
names_stack.push(file_name.to_str().unwrap().to_string());
path = path.parent().unwrap().to_path_buf();
} else {
names_stack.push(path.to_str().unwrap().to_string());
let current_dir = current_dir()?;
path = current_dir.clone();
}
}
Err(err) => return Err(err.into()),
}
}
}

fn is_conn_closed_error(e: &AnyError) -> bool {
get_custom_error_class(e) == Some("TypeError")
&& e.to_string() == ERROR_USING_CLOSED_DATABASE
Expand Down