From 0cae113ae3b6e7e237a497a4391eedc2f604e760 Mon Sep 17 00:00:00 2001 From: losfair Date: Thu, 21 Dec 2023 19:42:59 +0800 Subject: [PATCH] watch should stop when db closed --- cli/tests/unit/kv_test.ts | 20 ++++++++++++++++++++ ext/kv/lib.rs | 15 ++++++++++----- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 28c913f2151eab..4963882e1cc1aa 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -2248,3 +2248,23 @@ dbTest("set with key versionstamp suffix", async (db) => { "expected string, number, bigint, ArrayBufferView, boolean", ); }); + +Deno.test({ + name: "watch should stop when db closed", + async fn() { + const db = await Deno.openKv(":memory:"); + + const watch = db.watch([["a"]]); + const completion = (async () => { + for await (const _item of watch) { + // pass + } + })(); + + setTimeout(() => { + db.close(); + }, 100); + + await completion; + }, +}); diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index e78526856cd7d9..032a16863f912d 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -444,16 +444,21 @@ async fn op_kv_watch_next( let cancel_handle = resource.cancel_handle.clone(); let stream = RcRef::map(resource, |r| &r.stream) .borrow_mut() - .or_cancel(db_cancel_handle) - .or_cancel(cancel_handle) + .or_cancel(db_cancel_handle.clone()) + .or_cancel(cancel_handle.clone()) .await; let Ok(Ok(mut stream)) = stream else { return Ok(None); }; - // doesn't need a cancel handle because the stream ends when the database - // connection is closed - let Some(res) = stream.next().await else { + // We hold a strong reference to `resource`, so we can't rely on the stream + // being dropped when the db connection is closed + let Ok(Ok(Some(res))) = stream + .next() + .or_cancel(db_cancel_handle) + .or_cancel(cancel_handle) + .await + else { return Ok(None); };