-
Notifications
You must be signed in to change notification settings - Fork 27.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve internal web stream utils #53004
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
// Provider for the `useServerInsertedHTML` API to register callbacks to insert | ||
// elements into the HTML stream. | ||
|
||
import React from 'react' | ||
import { ServerInsertedHTMLContext } from '../../shared/lib/server-inserted-html' | ||
|
||
export function createServerInsertedHTML() { | ||
const serverInsertedHTMLCallbacks: (() => React.ReactNode)[] = [] | ||
const addInsertedHtml = (handler: () => React.ReactNode) => { | ||
serverInsertedHTMLCallbacks.push(handler) | ||
} | ||
|
||
return { | ||
ServerInsertedHTMLProvider({ children }: { children: JSX.Element }) { | ||
return ( | ||
<ServerInsertedHTMLContext.Provider value={addInsertedHtml}> | ||
{children} | ||
</ServerInsertedHTMLContext.Provider> | ||
) | ||
}, | ||
renderServerInsertedHTML() { | ||
return serverInsertedHTMLCallbacks.map((callback, index) => ( | ||
<React.Fragment key={'__next_server_inserted__' + index}> | ||
{callback()} | ||
</React.Fragment> | ||
)) | ||
}, | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,17 +58,14 @@ export function readableStreamTee<T = any>( | |
const writer2 = transformStream2.writable.getWriter() | ||
|
||
const reader = readable.getReader() | ||
function read() { | ||
reader.read().then(({ done, value }) => { | ||
if (done) { | ||
writer.close() | ||
writer2.close() | ||
return | ||
} | ||
writer.write(value) | ||
writer2.write(value) | ||
read() | ||
}) | ||
async function read() { | ||
const { done, value } = await reader.read() | ||
if (done) { | ||
await Promise.all([writer.close(), writer2.close()]) | ||
return | ||
} | ||
await Promise.all([writer.write(value), writer2.write(value)]) | ||
await read() | ||
Comment on lines
+63
to
+68
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you capture a todo for this? Also writer2 vs writer2, can probably improve the names on these. Isn't immediately clear. |
||
} | ||
read() | ||
|
||
|
@@ -91,15 +88,14 @@ export function chainStreams<T>( | |
} | ||
|
||
export function streamFromArray(strings: string[]): ReadableStream<Uint8Array> { | ||
// Note: we use a TransformStream here instead of instantiating a ReadableStream | ||
// because the built-in ReadableStream polyfill runs strings through TextEncoder. | ||
const { readable, writable } = new TransformStream() | ||
|
||
const writer = writable.getWriter() | ||
strings.forEach((str) => writer.write(encodeText(str))) | ||
writer.close() | ||
|
||
return readable | ||
return new ReadableStream({ | ||
start(controller) { | ||
for (const str of strings) { | ||
controller.enqueue(encodeText(str)) | ||
} | ||
controller.close() | ||
}, | ||
}) | ||
Comment on lines
+91
to
+98
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suspect that this TransformStream trick is no longer needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like a todo to confirm |
||
} | ||
|
||
export async function streamToString( | ||
|
@@ -121,19 +117,19 @@ export async function streamToString( | |
} | ||
} | ||
|
||
export function createBufferedTransformStream( | ||
transform: (v: string) => string | Promise<string> = (v) => v | ||
): TransformStream<Uint8Array, Uint8Array> { | ||
let bufferedString = '' | ||
export function createBufferedTransformStream(): TransformStream< | ||
Uint8Array, | ||
Uint8Array | ||
> { | ||
let bufferedBytes: Uint8Array = new Uint8Array() | ||
let pendingFlush: Promise<void> | null = null | ||
|
||
const flushBuffer = (controller: TransformStreamDefaultController) => { | ||
if (!pendingFlush) { | ||
pendingFlush = new Promise((resolve) => { | ||
setTimeout(async () => { | ||
const buffered = await transform(bufferedString) | ||
controller.enqueue(encodeText(buffered)) | ||
bufferedString = '' | ||
controller.enqueue(bufferedBytes) | ||
bufferedBytes = new Uint8Array() | ||
pendingFlush = null | ||
resolve() | ||
}, 0) | ||
|
@@ -142,11 +138,14 @@ export function createBufferedTransformStream( | |
return pendingFlush | ||
} | ||
|
||
const textDecoder = new TextDecoder() | ||
|
||
return new TransformStream({ | ||
transform(chunk, controller) { | ||
bufferedString += decodeText(chunk, textDecoder) | ||
const newBufferedBytes = new Uint8Array( | ||
bufferedBytes.length + chunk.byteLength | ||
) | ||
newBufferedBytes.set(bufferedBytes) | ||
newBufferedBytes.set(chunk, bufferedBytes.length) | ||
bufferedBytes = newBufferedBytes | ||
Comment on lines
+143
to
+148
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is far more efficient than decoding and encoding every chunk. |
||
flushBuffer(controller) | ||
}, | ||
|
||
|
@@ -164,7 +163,6 @@ export function createInsertedHTMLStream( | |
return new TransformStream({ | ||
async transform(chunk, controller) { | ||
const insertedHTMLChunk = encodeText(await getServerInsertedHTML()) | ||
|
||
controller.enqueue(insertedHTMLChunk) | ||
controller.enqueue(chunk) | ||
}, | ||
|
@@ -237,7 +235,7 @@ function createHeadInsertionTransformStream( | |
|
||
// Suffix after main body content - scripts before </body>, | ||
// but wait for the major chunks to be enqueued. | ||
export function createDeferredSuffixStream( | ||
function createDeferredSuffixStream( | ||
suffix: string | ||
): TransformStream<Uint8Array, Uint8Array> { | ||
let suffixFlushed = false | ||
|
@@ -246,7 +244,7 @@ export function createDeferredSuffixStream( | |
return new TransformStream({ | ||
transform(chunk, controller) { | ||
controller.enqueue(chunk) | ||
if (!suffixFlushed && suffix) { | ||
if (!suffixFlushed && suffix.length) { | ||
suffixFlushed = true | ||
suffixFlushTask = new Promise((res) => { | ||
// NOTE: streaming flush | ||
|
@@ -261,7 +259,7 @@ export function createDeferredSuffixStream( | |
}, | ||
flush(controller) { | ||
if (suffixFlushTask) return suffixFlushTask | ||
if (!suffixFlushed && suffix) { | ||
if (!suffixFlushed && suffix.length) { | ||
suffixFlushed = true | ||
controller.enqueue(encodeText(suffix)) | ||
} | ||
|
@@ -287,6 +285,12 @@ export function createInlineDataStream( | |
// the safe timing to pipe the data stream, this extra tick is | ||
// necessary. | ||
dataStreamFinished = new Promise((res) => | ||
// We use `setTimeout` here to ensure that it's inserted after flushing | ||
// the shell. Note that this implementation might get stale if impl | ||
// details of Fizz change in the future. | ||
// Also we are not using `setImmediate` here because it's not available | ||
// broadly in all runtimes, for example some edge workers might not | ||
// have it. | ||
setTimeout(async () => { | ||
try { | ||
while (true) { | ||
|
@@ -312,7 +316,12 @@ export function createInlineDataStream( | |
}) | ||
} | ||
|
||
export function createSuffixStream( | ||
/** | ||
* This transform stream moves the suffix to the end of the stream, so results | ||
* like `</body></html><script>...</script>` will be transformed to | ||
* `<script>...</script></body></html>`. | ||
*/ | ||
function createMoveSuffixStream( | ||
suffix: string | ||
): TransformStream<Uint8Array, Uint8Array> { | ||
let foundSuffix = false | ||
|
@@ -364,12 +373,14 @@ export function createRootLayoutValidatorStream( | |
controller.enqueue(chunk) | ||
}, | ||
flush(controller) { | ||
const missingTags = [ | ||
foundHtml ? null : 'html', | ||
foundBody ? null : 'body', | ||
].filter(nonNullable) | ||
// If html or body tag is missing, we need to inject a script to notify | ||
// the client. | ||
if (!foundHtml || !foundBody) { | ||
const missingTags = [ | ||
foundHtml ? null : 'html', | ||
foundBody ? null : 'body', | ||
].filter(nonNullable) | ||
|
||
if (missingTags.length > 0) { | ||
controller.enqueue( | ||
encodeText( | ||
`<script>self.__next_root_layout_missing_tags_error=${JSON.stringify( | ||
|
@@ -429,18 +440,14 @@ export async function continueFromInitialStream( | |
dataStream ? createInlineDataStream(dataStream) : null, | ||
|
||
// Close tags should always be deferred to the end | ||
createSuffixStream(closeTag), | ||
createMoveSuffixStream(closeTag), | ||
|
||
// Special head insertions | ||
createHeadInsertionTransformStream(async () => { | ||
// TODO-APP: Insert server side html to end of head in app layout rendering, to avoid | ||
// hydration errors. Remove this once it's ready to be handled by react itself. | ||
const serverInsertedHTML = | ||
getServerInsertedHTML && serverInsertedHTMLToHead | ||
? await getServerInsertedHTML() | ||
: '' | ||
return serverInsertedHTML | ||
}), | ||
// TODO-APP: Insert server side html to end of head in app layout rendering, to avoid | ||
// hydration errors. Remove this once it's ready to be handled by react itself. | ||
getServerInsertedHTML && serverInsertedHTMLToHead | ||
? createHeadInsertionTransformStream(getServerInsertedHTML) | ||
: null, | ||
|
||
validateRootLayout | ||
? createRootLayoutValidatorStream( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a separate file to keep this one simple.