From c9874d9d0fce3d66eff510f9e761e8a9db2067b3 Mon Sep 17 00:00:00 2001 From: soap Date: Thu, 26 Sep 2024 18:43:27 +0800 Subject: [PATCH 1/5] fix(fetch): Fix memory leak when handling endless streaming This is a quick fix to address a memory overflow issue caused by the recursive approach when handling endless streams (e.g., CCTV). However, this is not a perfect solution, as this approach still does not trigger the onFinishedResolving callback for streams that never terminate. Fixes GH-13806 --- packages/utils/src/instrument/fetch.ts | 42 +++++++++++--------------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/packages/utils/src/instrument/fetch.ts b/packages/utils/src/instrument/fetch.ts index a161b8db79bb..7c37d84354ee 100644 --- a/packages/utils/src/instrument/fetch.ts +++ b/packages/utils/src/instrument/fetch.ts @@ -119,33 +119,27 @@ async function resolveResponse(res: Response | undefined, onFinishedResolving: ( if (res && res.body && res.body.getReader) { const responseReader = res.body.getReader(); - // eslint-disable-next-line no-inner-declarations - async function consumeChunks({ done }: { done: boolean }): Promise { - if (!done) { - try { - // abort reading if read op takes more than 5s - const result = await Promise.race([ - responseReader.read(), - new Promise<{ done: boolean }>(res => { - setTimeout(() => { - res({ done: true }); - }, 5000); - }), - ]); - await consumeChunks(result); - } catch (error) { - // handle error if needed - } - } else { - return Promise.resolve(); + // NOTE: Still looking for a better solution to handle endless streams (e.g., CCTV). + // Currently, this implementation does not trigger onFinishedResolving + // for streams that never end, as the 'done' condition is never met. + let reading = true + while (reading) { + try { + // abort reading if read op takes more than 5s + const { done } = await Promise.race([ + responseReader.read(), + new Promise<{ done: boolean }>((resolve) => setTimeout(() => resolve({ done: true }), 5000)), + ]); + + if (done) reading = false; + } catch (error) { + // handle error if needed + reading = false; } } - return responseReader - .read() - .then(consumeChunks) - .then(onFinishedResolving) - .catch(() => undefined); + responseReader.releaseLock(); + onFinishedResolving(); } } From 335adccc168efaa452aab078c71fa6cb4f2fbf1c Mon Sep 17 00:00:00 2001 From: Ivan Chou Date: Thu, 26 Sep 2024 19:11:45 +0800 Subject: [PATCH 2/5] Update fetch.ts adjust format --- packages/utils/src/instrument/fetch.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/utils/src/instrument/fetch.ts b/packages/utils/src/instrument/fetch.ts index 7c37d84354ee..4f2555633ec2 100644 --- a/packages/utils/src/instrument/fetch.ts +++ b/packages/utils/src/instrument/fetch.ts @@ -122,13 +122,13 @@ async function resolveResponse(res: Response | undefined, onFinishedResolving: ( // NOTE: Still looking for a better solution to handle endless streams (e.g., CCTV). // Currently, this implementation does not trigger onFinishedResolving // for streams that never end, as the 'done' condition is never met. - let reading = true + let reading = true; while (reading) { try { // abort reading if read op takes more than 5s const { done } = await Promise.race([ responseReader.read(), - new Promise<{ done: boolean }>((resolve) => setTimeout(() => resolve({ done: true }), 5000)), + new Promise<{ done: boolean }>(resolve => setTimeout(() => resolve({ done: true }), 5000)), ]); if (done) reading = false; From d20c03b01694da53fc08c429d37eec0a60389857 Mon Sep 17 00:00:00 2001 From: Luca Forstner Date: Wed, 2 Oct 2024 12:51:39 +0000 Subject: [PATCH 3/5] Free usage of reader and and define max timeout of 90s --- packages/utils/src/instrument/fetch.ts | 59 ++++++++++++++++++-------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/packages/utils/src/instrument/fetch.ts b/packages/utils/src/instrument/fetch.ts index 4f2555633ec2..5ac2871b9689 100644 --- a/packages/utils/src/instrument/fetch.ts +++ b/packages/utils/src/instrument/fetch.ts @@ -116,34 +116,56 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat } async function resolveResponse(res: Response | undefined, onFinishedResolving: () => void): Promise { - if (res && res.body && res.body.getReader) { - const responseReader = res.body.getReader(); - - // NOTE: Still looking for a better solution to handle endless streams (e.g., CCTV). - // Currently, this implementation does not trigger onFinishedResolving - // for streams that never end, as the 'done' condition is never met. - let reading = true; - while (reading) { + if (res && res.body) { + const body = res.body; + const responseReader = body.getReader(); + + // Define a maximum duration after which we just cancel + const maxFetchDurationTimeout = setTimeout( + () => { + body.cancel().then(null, () => { + // noop + }); + }, + 90 * 1000, // 90s + ); + + let readingActive = true; + while (readingActive) { + let chunkTimeout; try { // abort reading if read op takes more than 5s - const { done } = await Promise.race([ - responseReader.read(), - new Promise<{ done: boolean }>(resolve => setTimeout(() => resolve({ done: true }), 5000)), - ]); + chunkTimeout = setTimeout(() => { + body.cancel().then(null, () => { + // noop on error + }); + }, 5000); + + const { done } = await responseReader.read(); - if (done) reading = false; + clearTimeout(chunkTimeout); + + if (done) { + onFinishedResolving(); + readingActive = false; + } } catch (error) { - // handle error if needed - reading = false; + readingActive = false; + } finally { + clearTimeout(chunkTimeout); } } + clearTimeout(maxFetchDurationTimeout); + responseReader.releaseLock(); - onFinishedResolving(); + responseReader.cancel().then(null, () => { + // noop on error + }); } } -async function streamHandler(response: Response): Promise { +function streamHandler(response: Response): void { // clone response for awaiting stream let clonedResponseForResolving: Response; try { @@ -152,7 +174,8 @@ async function streamHandler(response: Response): Promise { return; } - await resolveResponse(clonedResponseForResolving, () => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + resolveResponse(clonedResponseForResolving, () => { triggerHandlers('fetch-body-resolved', { endTimestamp: timestampInSeconds() * 1000, response, From 91396d06cded81849f13ba8c0ace4edf3d0ff00b Mon Sep 17 00:00:00 2001 From: Luca Forstner Date: Wed, 2 Oct 2024 13:52:44 +0000 Subject: [PATCH 4/5] Update tests --- .../test-applications/react-router-6/tests/sse.test.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dev-packages/e2e-tests/test-applications/react-router-6/tests/sse.test.ts b/dev-packages/e2e-tests/test-applications/react-router-6/tests/sse.test.ts index 92c06543c0b8..942e67ca4551 100644 --- a/dev-packages/e2e-tests/test-applications/react-router-6/tests/sse.test.ts +++ b/dev-packages/e2e-tests/test-applications/react-router-6/tests/sse.test.ts @@ -45,7 +45,6 @@ test('Waits for sse streaming when sse has been explicitly aborted', async ({ pa await fetchButton.click(); const rootSpan = await transactionPromise; - console.log(JSON.stringify(rootSpan, null, 2)); const sseFetchCall = rootSpan.spans?.filter(span => span.description === 'sse fetch call')[0] as SpanJSON; const httpGet = rootSpan.spans?.filter(span => span.description === 'GET http://localhost:8080/sse')[0] as SpanJSON; @@ -71,7 +70,7 @@ test('Waits for sse streaming when sse has been explicitly aborted', async ({ pa expect(consoleBreadcrumb?.message).toBe('Could not fetch sse AbortError: BodyStreamBuffer was aborted'); }); -test('Aborts when stream takes longer than 5s', async ({ page }) => { +test('Aborts when stream takes longer than 5s, by not updating the span duration', async ({ page }) => { await page.goto('/sse'); const transactionPromise = waitForTransaction('react-router-6', async transactionEvent => { @@ -102,5 +101,5 @@ test('Aborts when stream takes longer than 5s', async ({ page }) => { const resolveBodyDuration = Math.round((httpGet.timestamp as number) - httpGet.start_timestamp); expect(resolveDuration).toBe(0); - expect(resolveBodyDuration).toBe(7); + expect(resolveBodyDuration).toBe(0); }); From 1846cd4b72a44b5e330b42e4d2fa1ed52c144643 Mon Sep 17 00:00:00 2001 From: Luca Forstner Date: Wed, 2 Oct 2024 14:00:52 +0000 Subject: [PATCH 5/5] cancel body instead of reader and add comment --- packages/utils/src/instrument/fetch.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/utils/src/instrument/fetch.ts b/packages/utils/src/instrument/fetch.ts index 5ac2871b9689..ad28edf81e3f 100644 --- a/packages/utils/src/instrument/fetch.ts +++ b/packages/utils/src/instrument/fetch.ts @@ -141,6 +141,7 @@ async function resolveResponse(res: Response | undefined, onFinishedResolving: ( }); }, 5000); + // This .read() call will reject/throw when we abort due to timeouts through `body.cancel()` const { done } = await responseReader.read(); clearTimeout(chunkTimeout); @@ -159,7 +160,7 @@ async function resolveResponse(res: Response | undefined, onFinishedResolving: ( clearTimeout(maxFetchDurationTimeout); responseReader.releaseLock(); - responseReader.cancel().then(null, () => { + body.cancel().then(null, () => { // noop on error }); }