Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix internal execution and deferable function type #125

Merged
merged 3 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/long-dancers-judge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@defer/client": patch
---

Fix InternalExecution interface
5 changes: 5 additions & 0 deletions .changeset/old-doors-explain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@defer/client": patch
---

update type definition for `DeferableFunction`
36 changes: 15 additions & 21 deletions src/backend/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ import version from "../version.js";
import { Counter } from "./local/counter.js";
import { KV } from "./local/kv.js";

interface InternalExecution {
interface InternalExecution<F extends DeferableFunction> {
id: string;
args: string;
func: DeferableFunction;
func: DeferredFunction<F>;
functionId: string;
functionName: string;
state: ExecutionState;
Expand All @@ -63,7 +63,7 @@ interface InternalExecution {
}

const concurrencyCounter = new Counter();
const executionsStore = new KV<InternalExecution>();
const executionsStore = new KV<InternalExecution<any>>();
const functionIdMapping = new Map<string, string>();
const promisesState = new Set<Promise<void>>();

Expand Down Expand Up @@ -137,7 +137,7 @@ function paginate<T>(

function isExecutionMatchFilter(
filters: ExecutionFilters | undefined,
execution: InternalExecution,
execution: InternalExecution<any>,
): boolean {
if (
filters?.states &&
Expand Down Expand Up @@ -189,7 +189,7 @@ function isExecutionMatchFilter(
return true;
}

function buildExecution(execution: InternalExecution): Execution {
function buildExecution(execution: InternalExecution<any>): Execution {
return {
id: execution.id,
state: execution.state,
Expand Down Expand Up @@ -232,9 +232,7 @@ async function loop(shouldRun: () => boolean): Promise<void> {
const execution = await executionsStore.transaction(
executionId,
async (execution) => {
const func = execution.func as DeferredFunction<
typeof execution.func
>;
const func = execution.func;
shouldDiscard =
execution.state === "created" &&
execution.discardAfter !== undefined &&
Expand Down Expand Up @@ -348,7 +346,7 @@ export async function enqueue<F extends DeferableFunction>(
}

const now = new Date();
const execution: InternalExecution = {
const execution: InternalExecution<F> = {
id: randomUUID(),
state: "created",
functionId: functionId,
Expand Down Expand Up @@ -464,7 +462,7 @@ export async function reRunExecution(
throw new ExecutionNotFound(`cannot find execution "${id}"`);

const now = new Date();
const newExecution: InternalExecution = {
const newExecution: InternalExecution<typeof execution.func.__fn> = {
id: randomUUID(),
state: "created",
functionId: execution.functionId,
Expand All @@ -490,12 +488,10 @@ export async function listExecutions(
const data = new Map<string, Execution>();

for (const executionId of executionIds) {
const execution = (await executionsStore.get(
executionId,
)) as InternalExecution;
const execution = await executionsStore.get(executionId);

if (isExecutionMatchFilter(filters, execution))
data.set(executionId, buildExecution(execution));
if (isExecutionMatchFilter(filters, execution!))
data.set(executionId, buildExecution(execution!));
}

return paginate(pageRequest, data);
Expand All @@ -510,15 +506,13 @@ export async function listExecutionAttempts(
const data = new Map<string, Execution>();

for (const executionId of executionIds) {
const execution = (await executionsStore.get(
executionId,
)) as InternalExecution;
const execution = await executionsStore.get(executionId);

if (
(execution.id === id || execution.retryOf === id) &&
isExecutionMatchFilter(filters, execution)
(execution!.id === id || execution!.retryOf === id) &&
isExecutionMatchFilter(filters, execution!)
)
data.set(executionId, buildExecution(execution));
data.set(executionId, buildExecution(execution!));
}

return paginate(pageRequest, data);
Expand Down
10 changes: 5 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export interface RetryPolicy {
maxInterval: number;
}

export type DeferableFunction = (...args: any) => Promise<any>;
export type DeferableFunction = (...args: any[]) => Promise<any>;

export interface ExecutionOptions {
delay?: Duration | Date;
Expand Down Expand Up @@ -467,10 +467,10 @@ export async function listExecutions(
* @throws {DeferError} when error is unknown
* @returns {Promise<Awaited<F>>}
*/
export function awaitResult<F extends DeferableFunction>(
export function awaitResult<F extends DeferableFunction, R = ReturnType<F>>(
fn: DeferredFunction<F>,
): (...args: Parameters<F>) => Promise<Awaited<F>> {
return async function (...args: Parameters<F>): Promise<Awaited<F>> {
): (...args: Parameters<F>) => Promise<Awaited<R>> {
return async function (...args: Parameters<F>): Promise<Awaited<R>> {
const enqueueResponse = await enqueue(fn, ...args);
await sleep(1000);

Expand All @@ -493,7 +493,7 @@ export function awaitResult<F extends DeferableFunction>(
throw error;
}
case "succeed":
return await getExecutionResult<Awaited<F>>(enqueueResponse.id);
return await getExecutionResult<Awaited<R>>(enqueueResponse.id);
case "aborted":
case "cancelled":
case "discarded":
Expand Down
Loading