Skip to content

Commit

Permalink
fix streaming for child and non-child use cases
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanBoyle committed Dec 28, 2024
1 parent 17705b8 commit 2521e40
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 57 deletions.
2 changes: 1 addition & 1 deletion playground/chatCompletion.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const ChatCompletion = gsx.StreamComponent<ChatCompletionProps, string>(
const result = await llm.completeStream(prompt);

return {
stream: result.stream,
stream: () => result.stream(),
value: result.value,
};
},
Expand Down
44 changes: 38 additions & 6 deletions playground/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ async function runHNAnalysisExample() {
}

// Example 3: Streaming vs non-streaming chat completion
async function runStreamingExample() {
async function runStreamingWithChildrenExample() {
const prompt =
"Write a 250 word story about an AI that discovers the meaning of friendship through a series of small interactions with humans. Be concise but meaningful.";

console.log("\n🚀 Starting streaming example with prompt:", prompt);

console.log("\n📝 Non-streaming version (waiting for full response):");
const finalResult = await gsx.execute<string>(
<ChatCompletion prompt={prompt} />,
await gsx.execute<string>(
<ChatCompletion prompt={prompt}>
{async (response: string) => {
console.log(response);
}}
</ChatCompletion>,
);
console.log("✅ Complete response:", finalResult);

console.log("\n📝 Streaming version (processing tokens as they arrive):");
await gsx.execute(
Expand All @@ -64,10 +67,39 @@ async function runStreamingExample() {
);
}

async function runStreamingExample() {
const prompt =
"Write a 250 word story about an AI that discovers the meaning of friendship through a series of small interactions with humans. Be concise but meaningful.";

console.log("\n🚀 Starting streaming example with prompt:", prompt);

console.log("\n📝 Non-streaming version (waiting for full response):");
const finalResult = await gsx.execute<Streamable<string>>(
<ChatCompletion prompt={prompt} />,
);
console.log("✅ Complete response:", await finalResult.value);

console.log("\n📝 Streaming version (processing tokens as they arrive):");
const response: Streamable<string> = await gsx.execute(
<gsx.Stream>
<ChatCompletion prompt={prompt} />
</gsx.Stream>,
);

for await (const token of {
[Symbol.asyncIterator]: () => response.stream(),
}) {
process.stdout.write(token);
}
process.stdout.write("\n");
console.log("✅ Streaming complete");
}

// Main function to run examples
async function main() {
await runBlogWritingExample();
await runHNAnalysisExample();
// await runBlogWritingExample();
// await runHNAnalysisExample();
await runStreamingWithChildrenExample();
await runStreamingExample();
}

Expand Down
59 changes: 32 additions & 27 deletions src/jsx-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,18 @@ function isStreamable<T>(value: unknown): value is Streamable<T> {
export const jsx = <
TOutput,
TProps extends Record<string, unknown> & {
children?: (output: TOutput) => MaybePromise<JSX.Element | JSX.Element[]>;
children?:
| ((output: TOutput) => MaybePromise<JSX.Element | JSX.Element[]>)
| JSX.Element
| JSX.Element[];
},
>(
component: (props: TProps) => MaybePromise<TOutput>,
props: TProps | null,
children?: (output: TOutput) => MaybePromise<JSX.Element | JSX.Element[]>,
children?:
| ((output: TOutput) => MaybePromise<JSX.Element | JSX.Element[]>)
| JSX.Element
| JSX.Element[],
): Promise<Awaited<TOutput> | Awaited<TOutput>[]> => {
if (!children && props?.children) {
children = props.children;
Expand All @@ -59,50 +65,49 @@ export const jsx = <

// If this is a streaming result, handle it specially
if (isStreamable<TOutput>(rawResult)) {
if (!children) {
// When no children, return the value to be resolved later
return rawResult.value as Promise<Awaited<TOutput>>;
const hasChildFunction = typeof children === "function";

if (!hasChildFunction) {
// When no function children, preserve the streamable in streaming context
if (isInStreamingContext()) {
return rawResult as Awaited<TOutput>;
}
// Outside streaming context, resolve the value
return (await rawResult.value) as Awaited<TOutput>;

Check failure on line 76 in src/jsx-runtime.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

This assertion is unnecessary since it does not change the type of the expression

Check failure on line 76 in src/jsx-runtime.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

This assertion is unnecessary since it does not change the type of the expression
}

if (isInStreamingContext()) {
// In streaming context, pass the streamable to children and return their result
// No need to await the value here - the stream completion is sufficient
const childrenResult = await children(rawResult);
// In streaming context, pass the streamable to children function
const childrenResult = await (children as Function)(rawResult);

Check failure on line 81 in src/jsx-runtime.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unsafe assignment of an `any` value

Check failure on line 81 in src/jsx-runtime.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unsafe call of a(n) `Function` typed value

Check failure on line 81 in src/jsx-runtime.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unsafe assignment of an `any` value

Check failure on line 81 in src/jsx-runtime.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unsafe call of a(n) `Function` typed value
const resolvedResult = await resolveDeep(childrenResult);
return resolvedResult as Awaited<TOutput>;
} else {
// Outside streaming context, resolve the value first
const resolvedValue = await rawResult.value;
const childrenResult = await children(resolvedValue as TOutput);
const childrenResult = await (children as Function)(
resolvedValue as TOutput,
);
const resolvedResult = await resolveDeep(childrenResult);
return resolvedResult as Awaited<TOutput>;
}
}

// For non-streaming results, resolve deeply
const result = (await resolveDeep(rawResult)) as TOutput;
// For non-streaming results, resolve deeply but preserve streamables
const result = await resolveDeep(rawResult);

// If there are no children, return the resolved result
if (!children) {
// Check again after deep resolution in case we got a streamable
if (isStreamable<TOutput>(result) && isInStreamingContext()) {
return result as Awaited<TOutput>;
}

// Handle array of children (Fragment edge case)
if (Array.isArray(children)) {
const resolvedChildren = await Promise.all(
children.map(child => resolveDeep(child)),
);
return resolvedChildren as Awaited<TOutput>[];
// If there are no function children, return the resolved result
if (typeof children !== "function") {
return result as Awaited<TOutput>;
}

// Handle child function
if (typeof children === "function") {
const childrenResult = await children(result);
const resolvedResult = await resolveDeep(childrenResult);
return resolvedResult as Awaited<TOutput>;
}

// Handle single child (Fragment edge case)
const resolvedResult = await resolveDeep(children);
const childrenResult = await children(result as TOutput);
const resolvedResult = await resolveDeep(childrenResult);
return resolvedResult as Awaited<TOutput>;
})();
};
Expand Down
56 changes: 40 additions & 16 deletions src/resolve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {

import { JSX } from "./jsx-runtime";
import { isInStreamingContext } from "./stream";
import { setStreamingContext } from "./stream";

type ComponentType<P, O> = WorkflowComponent<P, O> | StreamComponent<P, O>;

Expand Down Expand Up @@ -62,8 +63,10 @@ export async function resolveDeep<T>(value: unknown): Promise<T> {
if (isInStreamingContext()) {
return value as T;
}
// Outside streaming context, resolve the value
const finalValue = await value.value;
return finalValue as T;
// Recursively resolve in case the value itself is a Streamable
return resolveDeep(finalValue);
}

// Handle arrays
Expand Down Expand Up @@ -105,22 +108,43 @@ export async function execute<T>(element: ExecutableValue): Promise<T> {
throw new Error("Cannot execute null or undefined element");
}

// Handle JSX elements specially to support children functions
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (isJSXElement(element)) {
const componentResult = await element.type(element.props);

// Handle children
if (element.props.children) {
const resolvedResult = await resolveDeep(componentResult);
const childrenResult = await element.props.children(resolvedResult);
return execute(childrenResult as ExecutableValue);
// Get initial streaming context state
const initialStreamingContext = isInStreamingContext();

try {
// Handle JSX elements specially to support children functions
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (isJSXElement(element)) {
const componentResult = await element.type(element.props);

// Check if result is streamable in streaming context first
if (isStreamable(componentResult) && isInStreamingContext()) {
if (element.props.children) {
// With children, let them handle the streamable
const childrenResult = await element.props.children(componentResult);
return execute(childrenResult as ExecutableValue);

Check failure on line 125 in src/resolve.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Returning an awaited promise is required in this context

Check failure on line 125 in src/resolve.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Returning an awaited promise is required in this context
}
// No children, preserve the streamable
return componentResult as T;
}

// Handle non-streaming cases
if (element.props.children) {
const resolvedResult = await resolveDeep(componentResult);
const childrenResult = await element.props.children(resolvedResult);
return execute(childrenResult as ExecutableValue);

Check failure on line 135 in src/resolve.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Returning an awaited promise is required in this context

Check failure on line 135 in src/resolve.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Returning an awaited promise is required in this context
}

// No children, resolve the result
return resolveDeep(componentResult);

Check failure on line 139 in src/resolve.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Returning an awaited promise is required in this context

Check failure on line 139 in src/resolve.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Returning an awaited promise is required in this context
}

// No children, just resolve the result
return resolveDeep(componentResult);
// For all other cases, use the shared resolver
return resolveDeep(element);

Check failure on line 143 in src/resolve.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Returning an awaited promise is required in this context

Check failure on line 143 in src/resolve.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Returning an awaited promise is required in this context
} finally {
// Only restore streaming context if it changed during execution
if (isInStreamingContext() !== initialStreamingContext) {
setStreamingContext(initialStreamingContext);
}
}

// For all other cases, use the shared resolver
return resolveDeep(element);
}
36 changes: 29 additions & 7 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import type { Element, ExecutableValue, StreamComponent } from "./types";
import type { Element, StreamComponent, Streamable } from "./types";

Check failure on line 1 in src/stream.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Run autofix to sort these imports!

Check failure on line 1 in src/stream.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Run autofix to sort these imports!

import { execute } from "./resolve";

// Global state to track streaming context
let isStreaming = false;

// Helper to set streaming context
export function setStreamingContext(value: boolean): void {
isStreaming = value;
}

// Helper to check if a component is a stream component
export function isStreamComponent(
component: unknown,
Expand All @@ -17,19 +22,36 @@ export function isStreamComponent(
}

// Component to enable streaming for its children
export async function Stream(props: {
export async function Stream<T>(props: {
children: Element;
}): Promise<ExecutableValue> {
const prevIsStreaming = isStreaming;
isStreaming = true;
}): Promise<T | Streamable<T>> {
const prevIsStreaming = isInStreamingContext();
setStreamingContext(true);

try {
return await execute(props.children);
const result = await execute<T | Streamable<T>>(props.children);
// If we got a streamable result, return it directly
if (isStreamable(result)) {
return result;
}
return result as T;
} finally {
isStreaming = prevIsStreaming;
// Don't restore streaming context here - it needs to persist through the outer JSX runtime
}
}

// Helper to check if something is a streamable value
function isStreamable<T>(value: unknown): value is Streamable<T> {
return (
typeof value === "object" &&
value !== null &&
"stream" in value &&
"value" in value &&
typeof (value as Streamable<T>).stream === "function" &&
value.value instanceof Promise
);
}

// Helper to check if we're in a streaming context
export function isInStreamingContext(): boolean {
return isStreaming;
Expand Down

0 comments on commit 2521e40

Please sign in to comment.