Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

DO Alarms patch #294

Merged
merged 10 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions packages/durable-objects/src/alarms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class AlarmStore {
// 'objectName:hexId' -> DurableObjectAlarm [pulled from plugin.getObject]
#alarms: Map<string, DurableObjectAlarm> = new Map();
#alarmTimeout?: NodeJS.Timeout;
#callback?: (objectKey: string) => Promise<void>;

// build a map of all alarms from file storage if persist
async setupStore(storage: StorageFactory, persist?: boolean | string) {
Expand All @@ -38,20 +39,33 @@ export class AlarmStore {
}
}

#setAlarmTimeout(
now: number,
objectKey: string,
doAlarm: DurableObjectAlarm
) {
// if timeout was already created, delete alarm incase scheduledTime changed
if (doAlarm.timeout) clearTimeout(doAlarm.timeout);
// set alarm
doAlarm.timeout = setTimeout(() => {
this.#deleteAlarm(objectKey, doAlarm);
this.#callback?.(objectKey);
}, Math.max(doAlarm.scheduledTime - now, 0));
}

// any alarms 30 seconds in the future or sooner are returned
async setupAlarms(callback: (objectKey: string) => Promise<void>) {
async setupAlarms(callback?: (objectKey: string) => Promise<void>) {
if (typeof callback === "function") this.#callback = callback;
if (this.#alarmTimeout) return;
const now = Date.now();

// iterate the store. For every alarm within 30 seconds of now,
// setup a timeout and run the callback and then delete the alarm
for (const [objectKey, doAlarm] of this.#alarms) {
const { scheduledTime } = doAlarm;
// if the alarm is within the next 30 seconds, set a timeout
if (scheduledTime < now + 30_000) {
doAlarm.timeout = setTimeout(() => {
this.#deleteAlarm(objectKey, doAlarm);
callback(objectKey);
}, Math.max(scheduledTime - now, 0));
this.#setAlarmTimeout(now, objectKey, doAlarm);
}
}

Expand All @@ -60,7 +74,7 @@ export class AlarmStore {
// prior to our next check.
this.#alarmTimeout = setTimeout(() => {
this.#alarmTimeout = undefined;
this.setupAlarms(callback);
this.setupAlarms();
}, 30_000);
this.#alarmTimeout.unref();
}
Expand All @@ -74,10 +88,25 @@ export class AlarmStore {
}

async setAlarm(objectKey: string, scheduledTime: number | Date) {
const now = Date.now();
if (typeof scheduledTime !== "number")
scheduledTime = scheduledTime.getTime();
if (scheduledTime <= 0) {
throw TypeError("setAlarm() cannot be called with an alarm time <= 0");
}
// pull in the alarm or create a new one if it does not exist
const doAlarm: DurableObjectAlarm = this.#alarms.get(objectKey) ?? {
scheduledTime,
};
// update scheduledTime incase old alarm existed
doAlarm.scheduledTime = scheduledTime;
// if the alarm is within the next 31 seconds, set a timeout immediately
// add a second to ensure healthy overlap between alarm checks
if (scheduledTime < now + 31_000) {
this.#setAlarmTimeout(now, objectKey, doAlarm);
}
// set the alarm in the store
this.#alarms.set(objectKey, { scheduledTime });
this.#alarms.set(objectKey, doAlarm);
// store the alarm in storage
assert(this.#store);
await this.#store.put(objectKey, {
Expand Down
118 changes: 118 additions & 0 deletions packages/durable-objects/test/alarms.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import assert from "assert";
import { MemoryStorageFactory } from "@miniflare/shared-test";
import anyTest, { TestInterface } from "ava";
import { AlarmStore } from "../src/alarms";

interface Context {
alarmStore: AlarmStore;
}

const test = anyTest as TestInterface<Context>;

test.beforeEach((t) => {
const factory = new MemoryStorageFactory();
const alarmStore = new AlarmStore();
alarmStore.setupStore(factory);
t.context = { alarmStore };
});

test.afterEach((t) => {
const { alarmStore } = t.context;
alarmStore.dispose();
});

test("Alarms: check that a bridge is created", (t) => {
const { alarmStore } = t.context;
const bridge = alarmStore.buildBridge("test");
assert(bridge);
t.is(typeof bridge.setAlarm, "function");
t.is(typeof bridge.deleteAlarm, "function");
});

test("Alarms: setupAlarms and call setAlarm immediately", async (t) => {
t.plan(1);
const { alarmStore } = t.context;
await new Promise<null>((resolve) => {
alarmStore.setupAlarms(async (objectKey) => {
t.is(objectKey, "test");
resolve(null);
});
alarmStore.setAlarm("test", 1);
});
});

test("Alarms: wait a second before updating value", async (t) => {
t.plan(3);
const { alarmStore } = t.context;
let value = 3;
const promise = new Promise<null>((resolve) => {
alarmStore.setupAlarms(async (objectKey) => {
t.is(objectKey, "update");
value++;
resolve(null);
});
alarmStore.setAlarm("update", Date.now() + 1_000);
});
t.is(value, 3);
await promise;
t.is(value, 4);
});

test("Alarms: setAlarm returns undefined; deleteAlarm", async (t) => {
const { alarmStore } = t.context;
const alarm = await alarmStore.setAlarm("toDelete", Date.now() + 50_000);
t.is(alarm, undefined);
const deleted = await alarmStore.deleteAlarm("toDelete");
t.is(deleted, undefined);
t.pass();
});
test("Alarms: check delete worked via a wait period", async (t) => {
t.plan(1);
const { alarmStore } = t.context;
alarmStore.setupAlarms(async () => {
t.fail();
});
// set first alarm 1 second from now
await alarmStore.setAlarm("test", Date.now() + 1_000);
// delete said alarm
await alarmStore.deleteAlarm("test");
// wait an appropriate amount of time
await new Promise((resolve) => setTimeout(resolve, 2_000));
t.pass();
});

test("Alarms: setupAlarms and call setAlarm through the bridge", async (t) => {
t.plan(1);
const { alarmStore } = t.context;
const bridge = alarmStore.buildBridge("test");
await new Promise<null>((resolve) => {
alarmStore.setupAlarms(async (objectKey) => {
t.is(objectKey, "test");
resolve(null);
});
bridge.setAlarm(1);
});
});

test("Alarms: setupAlarms and call setAlarm twice. The second one should trigger", async (t) => {
t.plan(1);
const { alarmStore } = t.context;
const now = Date.now();
await new Promise<null>((resolve) => {
alarmStore.setupAlarms(async () => {
t.true(Date.now() - now > 2_000);
resolve(null);
});
// set first alarm 1 second from now
alarmStore.setAlarm("test", Date.now() + 1_000);
// set the second 5 seconds from now
alarmStore.setAlarm("test", Date.now() + 3_000);
});
});

test("Alarms: setTimeout of 0 throws", async (t) => {
const { alarmStore } = t.context;
await t.throwsAsync(async () => {
await alarmStore.setAlarm("test", 0);
});
});
4 changes: 2 additions & 2 deletions packages/durable-objects/test/storage.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -992,8 +992,8 @@ test("setAlarm: backing returns inputed number", async (t) => {
test("setAlarm: overide alarm", async (t) => {
const { storage } = t.context;
await storage.setAlarm(testNumber);
await storage.setAlarm(0);
t.is(await storage.getAlarm(), 0);
await storage.setAlarm(5);
t.is(await storage.getAlarm(), 5);
});
test("setAlarm: closes input gate unless allowConcurrency", async (t) => {
const { storage } = t.context;
Expand Down