Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(worker-api/impl): pass stream to parent #324

Merged
merged 24 commits into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 23 additions & 71 deletions packages/waku/src/lib/handlers/dev-worker-api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import type { Worker as WorkerOrig } from 'node:worker_threads';
import type {
TransferListItem,
Worker as WorkerType,
} from 'node:worker_threads';

import type { ResolvedConfig } from '../config.js';
import type { ModuleImportResult } from './types.js';
Expand All @@ -19,34 +22,28 @@ export type BuildOutput = {
htmlFiles: string[];
};

export type MessageReq =
| ({
id: number;
type: 'render';
hasModuleIdCallback: boolean;
} & Omit<RenderRequest, 'stream' | 'moduleIdCallback'>)
| { id: number; type: 'buf'; buf: ArrayBuffer; offset: number; len: number }
| { id: number; type: 'end' }
| { id: number; type: 'err'; err: unknown };
export type MessageReq = {
id: number;
type: 'render';
hasModuleIdCallback: boolean;
} & Omit<RenderRequest, 'moduleIdCallback'>;

export type MessageRes =
| { type: 'full-reload' }
| { type: 'hot-import'; source: string }
| { type: 'module-import'; result: ModuleImportResult }
| { id: number; type: 'start'; context: unknown }
| { id: number; type: 'buf'; buf: ArrayBuffer; offset: number; len: number }
| { id: number; type: 'end' }
| { id: number; type: 'start'; context: unknown; stream: ReadableStream }
| { id: number; type: 'err'; err: unknown; statusCode?: number }
| { id: number; type: 'moduleId'; moduleId: string };

const messageCallbacks = new Map<number, (mesg: MessageRes) => void>();

let lastWorker: Promise<WorkerOrig> | undefined;
let lastWorker: Promise<WorkerType> | undefined;
const getWorker = () => {
if (lastWorker) {
return lastWorker;
}
return (lastWorker = new Promise<WorkerOrig>((resolve, reject) => {
return (lastWorker = new Promise<WorkerType>((resolve, reject) => {
Promise.all([
import('node:worker_threads').catch((e) => {
throw e;
Expand Down Expand Up @@ -124,69 +121,23 @@ export async function renderRscWithWorker<Context>(
): Promise<readonly [ReadableStream, Context]> {
const worker = await getWorker();
const id = nextId++;
const pipe = async () => {
if (rr.stream) {
const reader = rr.stream.getReader();
try {
let result: ReadableStreamReadResult<unknown>;
do {
result = await reader.read();
if (result.value) {
const buf = result.value;
let mesg: MessageReq;
if (buf instanceof ArrayBuffer) {
mesg = { id, type: 'buf', buf, offset: 0, len: buf.byteLength };
} else if (buf instanceof Uint8Array) {
mesg = {
id,
type: 'buf',
buf: buf.buffer,
offset: buf.byteOffset,
len: buf.byteLength,
};
} else {
throw new Error('Unexepected buffer type');
}
worker.postMessage(mesg, [mesg.buf]);
}
} while (!result.done);
} catch (err) {
const mesg: MessageReq = { id, type: 'err', err };
worker.postMessage(mesg);
}
}
const mesg: MessageReq = { id, type: 'end' };
worker.postMessage(mesg);
};
let started = false;
return new Promise((resolve, reject) => {
let controller: ReadableStreamDefaultController<Uint8Array>;
const stream = new ReadableStream({
start(c) {
controller = c;
},
});
messageCallbacks.set(id, (mesg) => {
if (mesg.type === 'start') {
if (!started) {
started = true;
resolve([stream, mesg.context as Context]);
const bridge = new TransformStream({
flush() {
messageCallbacks.delete(id);
},
});
resolve([mesg.stream.pipeThrough(bridge), mesg.context as Context]);
} else {
throw new Error('already started');
}
} else if (mesg.type === 'buf') {
if (!started) {
throw new Error('not yet started');
}
controller.enqueue(new Uint8Array(mesg.buf, mesg.offset, mesg.len));
} else if (mesg.type === 'moduleId') {
rr.moduleIdCallback?.(mesg.moduleId);
} else if (mesg.type === 'end') {
if (!started) {
throw new Error('not yet started');
}
controller.close();
messageCallbacks.delete(id);
} else if (mesg.type === 'err') {
const err =
mesg.err instanceof Error ? mesg.err : new Error(String(mesg.err));
Expand All @@ -195,8 +146,6 @@ export async function renderRscWithWorker<Context>(
}
if (!started) {
reject(err);
} else {
controller.error(err);
}
messageCallbacks.delete(id);
}
Expand All @@ -209,9 +158,12 @@ export async function renderRscWithWorker<Context>(
id,
type: 'render',
hasModuleIdCallback: !!rr.moduleIdCallback,
...(rr.stream ? { stream: rr.stream } : {}),
...copied,
};
worker.postMessage(mesg);
pipe();
worker.postMessage(
mesg,
rr.stream ? [rr.stream as unknown as TransferListItem] : undefined,
);
});
}
52 changes: 10 additions & 42 deletions packages/waku/src/lib/handlers/dev-worker-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import { pathToFileURL } from 'node:url';
import { parentPort } from 'node:worker_threads';
import { Server } from 'node:http';
import type { TransferListItem } from 'node:worker_threads';
import { createServer as createViteServer } from 'vite';

import type { EntriesDev } from '../../server.js';
import type { ResolvedConfig } from '../config.js';
import { joinPath, fileURLToFilePath } from '../utils/path.js';
import { hasStatusCode, deepFreeze } from '../renderers/utils.js';
import { deepFreeze, hasStatusCode } from '../renderers/utils.js';
import type {
MessageReq,
MessageRes,
Expand All @@ -26,18 +27,11 @@ const HAS_MODULE_REGISTER = typeof module.register === 'function';
if (HAS_MODULE_REGISTER) {
module.register('waku/node-loader', pathToFileURL('./'));
}
const controllerMap = new Map<number, ReadableStreamDefaultController>();

const handleRender = async (mesg: MessageReq & { type: 'render' }) => {
const { id, type: _removed, hasModuleIdCallback, ...rest } = mesg;
const rr: RenderRequest = rest;
try {
const stream = new ReadableStream({
start(controller) {
controllerMap.set(id, controller);
},
});
rr.stream = stream;
if (hasModuleIdCallback) {
rr.moduleIdCallback = (moduleId: string) => {
const mesg: MessageRes = { id, type: 'moduleId', moduleId };
Expand All @@ -50,36 +44,21 @@ const handleRender = async (mesg: MessageReq & { type: 'render' }) => {
searchParams: new URLSearchParams(rr.searchParamsString),
method: rr.method,
context: rr.context,
body: rr.stream,
body: rr.stream!,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we assure that it's not undefined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I thought about this, and I could not find a solution, it required so many types gymnastics which I then could not get it right. Let me know if you have an idea.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, it's optional. 204a34e should do for now. We can probably accept undefined so that we don't need such syntax.

contentType: rr.contentType,
...(rr.moduleIdCallback ? { moduleIdCallback: rr.moduleIdCallback } : {}),
isDev: true,
customImport: loadServerFile,
entries: await loadEntries(rr.config),
});
const mesg: MessageRes = { id, type: 'start', context: rr.context };
parentPort!.postMessage(mesg);
const mesg: MessageRes = {
id,
type: 'start',
context: rr.context,
stream: readable,
};
parentPort!.postMessage(mesg, [readable as unknown as TransferListItem]);
deepFreeze(rr.context);
dai-shi marked this conversation as resolved.
Show resolved Hide resolved
const writable = new WritableStream({
write(chunk) {
if (!(chunk instanceof Uint8Array)) {
throw new Error('Unknown chunk type');
}
const mesg: MessageRes = {
id,
type: 'buf',
buf: chunk.buffer,
offset: chunk.byteOffset,
len: chunk.byteLength,
};
parentPort!.postMessage(mesg, [mesg.buf]);
},
close() {
const mesg: MessageRes = { id, type: 'end' };
parentPort!.postMessage(mesg);
},
});
readable.pipeTo(writable);
} catch (err) {
const mesg: MessageRes = { id, type: 'err', err };
if (hasStatusCode(err)) {
Expand Down Expand Up @@ -142,16 +121,5 @@ const loadEntries = async (config: ResolvedConfig) => {
parentPort!.on('message', (mesg: MessageReq) => {
if (mesg.type === 'render') {
handleRender(mesg);
} else if (mesg.type === 'buf') {
const controller = controllerMap.get(mesg.id)!;
controller.enqueue(new Uint8Array(mesg.buf, mesg.offset, mesg.len));
} else if (mesg.type === 'end') {
const controller = controllerMap.get(mesg.id)!;
controller.close();
} else if (mesg.type === 'err') {
const controller = controllerMap.get(mesg.id)!;
const err =
mesg.err instanceof Error ? mesg.err : new Error(String(mesg.err));
controller.error(err);
}
});
Loading