Skip to content

Commit

Permalink
ai/core: allow reading the stream from streamText multiple times. (#1510
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lgrammel authored May 8, 2024
1 parent b15b8af commit d1b1880
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 3 deletions.
5 changes: 5 additions & 0 deletions .changeset/late-toys-perform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'ai': patch
---

fix (ai/core): allow reading streams in streamText result multiple times
53 changes: 53 additions & 0 deletions packages/core/core/generate-text/stream-text.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -439,3 +439,56 @@ describe('result.toTextStreamResponse', () => {
assert.deepStrictEqual(chunks, ['Hello', ', ', 'world!']);
});
});

describe('multiple stream consumption', () => {
it('should support text stream, ai stream, full stream on single result object', async () => {
const result = await streamText({
model: new MockLanguageModelV1({
doStream: async () => {
return {
stream: convertArrayToReadableStream([
{ type: 'text-delta', textDelta: 'Hello' },
{ type: 'text-delta', textDelta: ', ' },
{ type: 'text-delta', textDelta: 'world!' },
{
type: 'finish',
finishReason: 'stop',
logprobs: undefined,
usage: { completionTokens: 10, promptTokens: 3 },
},
]),
rawCall: { rawPrompt: 'prompt', rawSettings: {} },
};
},
}),
prompt: 'test-input',
});

assert.deepStrictEqual(
await convertAsyncIterableToArray(result.textStream),
['Hello', ', ', 'world!'],
);

assert.deepStrictEqual(
await convertReadableStreamToArray(
result.toAIStream().pipeThrough(new TextDecoderStream()),
),
['0:"Hello"\n', '0:", "\n', '0:"world!"\n'],
);

assert.deepStrictEqual(
await convertAsyncIterableToArray(result.fullStream),
[
{ type: 'text-delta', textDelta: 'Hello' },
{ type: 'text-delta', textDelta: ', ' },
{ type: 'text-delta', textDelta: 'world!' },
{
type: 'finish',
finishReason: 'stop',
logprobs: undefined,
usage: { completionTokens: 10, promptTokens: 3, totalTokens: 13 },
},
],
);
});
});
20 changes: 17 additions & 3 deletions packages/core/core/generate-text/stream-text.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export type TextStreamPart<TOOLS extends Record<string, CoreTool>> =
A result object for accessing different stream types and additional information.
*/
export class StreamTextResult<TOOLS extends Record<string, CoreTool>> {
private readonly originalStream: ReadableStream<TextStreamPart<TOOLS>>;
private originalStream: ReadableStream<TextStreamPart<TOOLS>>;

/**
Warnings from the model provider (e.g. unsupported settings)
Expand Down Expand Up @@ -173,13 +173,27 @@ Response headers.
this.rawResponse = rawResponse;
}

/**
Split out a new stream from the original stream.
The original stream is replaced to allow for further splitting,
since we do not know how many times the stream will be split.
Note: this leads to buffering the stream content on the server.
However, the LLM results are expected to be small enough to not cause issues.
*/
private teeStream() {
const [stream1, stream2] = this.originalStream.tee();
this.originalStream = stream2;
return stream1;
}

/**
A text stream that returns only the generated text deltas. You can use it
as either an AsyncIterable or a ReadableStream. When an error occurs, the
stream will throw the error.
*/
get textStream(): AsyncIterableStream<string> {
return createAsyncIterableStream(this.originalStream, {
return createAsyncIterableStream(this.teeStream(), {
transform(chunk, controller) {
if (chunk.type === 'text-delta') {
// do not stream empty text deltas:
Expand All @@ -200,7 +214,7 @@ You can use it as either an AsyncIterable or a ReadableStream. When an error occ
stream will throw the error.
*/
get fullStream(): AsyncIterableStream<TextStreamPart<TOOLS>> {
return createAsyncIterableStream(this.originalStream, {
return createAsyncIterableStream(this.teeStream(), {
transform(chunk, controller) {
if (chunk.type === 'text-delta') {
// do not stream empty text deltas:
Expand Down

0 comments on commit d1b1880

Please sign in to comment.