Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamingTextResponse not compatible with Remix action function #199

Closed
cephalization opened this issue Jun 22, 2023 · 36 comments
Closed

StreamingTextResponse not compatible with Remix action function #199

cephalization opened this issue Jun 22, 2023 · 36 comments

Comments

@cephalization
Copy link

cephalization commented Jun 22, 2023

When using the package in an action, like so,

export const action = async ({ request }: ActionArgs) => {
    const { messages, transcriptId } = z
        .object({
            messages: z.array(
                z
                    .object({
                        content: z.string(),
                        role: z.union([z.literal('user'), z.literal('system'), z.literal('assistant')]),
                    })
                    .passthrough(),
            ),
            transcriptId: z.string(),
        })
        .parse(await request.json());

    // Load transcript set from Redis, turn it into a string
    const transcriptArray = await transcriptQueries.getTranscriptArray(redisClient, { transcriptKey: transcriptId });
    const transcript = transcriptArray;
    const prompt = `<my prompt>`;
    const content = prompt.replace('{transcript}', constrainLinesToTokenLimit(transcript, prompt, 6000).join('\n'));

    const newMessages = [
        {
            role: 'system',
            content,
        },
        ...messages,
    ] as const;

    const response = await openai.createChatCompletion({
        // model: 'gpt-3.5-turbo-16k',
        model: 'gpt-4',
        // @ts-expect-error
        messages: newMessages,
        stream: true,
    });
    const stream = OpenAIStream(response);

    return new StreamingTextResponse(stream);
};

The action returns an internal error

TypeError: First parameter has member 'readable' that is not a ReadableStream.
dashboard:dev:     at assertReadableStream (/Users/tony/repos/teno/node_modules/web-streams-polyfill/src/lib/validators/readable-stream.ts:5:11)
dashboard:dev:     at convertReadableWritablePair (/Users/tony/repos/teno/node_modules/web-streams-polyfill/src/lib/validators/readable-writable-pair.ts:15:3)
dashboard:dev:     at ReadableStream.pipeThrough (/Users/tony/repos/teno/node_modules/web-streams-polyfill/src/lib/readable-stream.ts:211:23)
dashboard:dev:     at AIStream (/Users/tony/repos/teno/node_modules/ai/dist/index.js:147:29)
dashboard:dev:     at OpenAIStream (/Users/tony/repos/teno/node_modules/ai/dist/index.js:170:10)
dashboard:dev:     at action7 (/Users/tony/repos/teno/apps/dashboard/app/routes/api.chat.ts:68:17)
dashboard:dev:     at processTicksAndRejections (node:internal/process/task_queues:95:5)
dashboard:dev:     at Object.callRouteActionRR (/Users/tony/repos/teno/node_modules/@remix-run/node/node_modules/@remix-run/server-runtime/dist/data.js:35:16)
dashboard:dev:     at callLoaderOrAction (/Users/tony/repos/teno/node_modules/@remix-run/router/router.ts:3568:16)
dashboard:dev:     at submit (/Users/tony/repos/teno/node_modules/@remix-run/router/router.ts:2835:16)
dashboard:dev:     at queryImpl (/Users/tony/repos/teno/node_modules/@remix-run/router/router.ts:2770:22)
dashboard:dev:     at Object.queryRoute (/Users/tony/repos/teno/node_modules/@remix-run/router/router.ts:2720:18)
dashboard:dev:     at handleResourceRequestRR (/Users/tony/repos/teno/node_modules/@remix-run/node/node_modules/@remix-run/server-runtime/dist/server.js:251:20)
dashboard:dev:     at requestHandler (/Users/tony/repos/teno/node_modules/@remix-run/node/node_modules/@remix-run/server-runtime/dist/server.js:59:18)
dashboard:dev:     at /Users/tony/repos/teno/node_modules/@remix-run/express/dist/server.js:39:22

This appears to be due to some web streams workaround that remix installs. It even occurs when hosted on vercel. This error does not occur if I move all of this code into a dedicated express api but that defeats the purpose of me using remix as it would be my only external api call.

Anyone had luck fixing this? I've tried implementing the solutions here MattiasBuelens/web-streams-polyfill#93 with no luck.

I will try to produce a reproduction repo soon, but using any remix template should immediately reproduce the issue.

@cephalization
Copy link
Author

@cephalization
Copy link
Author

I had to perform some unspeakable acts, but this package can be made compatible with remix actions now.

import { createEventStreamTransformer, trimStartOfStreamHelper } from 'ai';
import { ReadableStream as PolyfillReadableStream } from 'web-streams-polyfill';
import { createReadableStreamWrapper } from '@mattiasbuelens/web-streams-adapter';

// @ts-expect-error bad types
const toPolyfillReadable = createReadableStreamWrapper(PolyfillReadableStream);
const toNativeReadable = createReadableStreamWrapper(ReadableStream);

export class StreamingTextResponse extends Response {
	constructor(res: ReadableStream, init?: ResponseInit) {
		const headers: HeadersInit = {
			'Content-Type': 'text/plain; charset=utf-8',
			...init?.headers,
		};
		super(res, { ...init, status: 200, headers });
		this.getRequestHeaders();
	}

	getRequestHeaders() {
		return addRawHeaders(this.headers);
	}
}

function addRawHeaders(headers: Headers) {
	// @ts-expect-error shame on me
	headers.raw = function () {
		const rawHeaders = {};
		const headerEntries = headers.entries();
		for (const [key, value] of headerEntries) {
			const headerKey = key.toLowerCase();
			if (rawHeaders.hasOwnProperty(headerKey)) {
				// @ts-expect-error shame on me
				rawHeaders[headerKey].push(value);
			} else {
				// @ts-expect-error shame on me
				rawHeaders[headerKey] = [value];
			}
		}
		return rawHeaders;
	};
	return headers;
}

function parseOpenAIStream(): (data: string) => string | void {
	const trimStartOfStream = trimStartOfStreamHelper();
	return (data) => {
		// TODO: Needs a type
		const json = JSON.parse(data);

		// this can be used for either chat or completion models
		const text = trimStartOfStream(json.choices[0]?.delta?.content ?? json.choices[0]?.text ?? '');

		return text;
	};
}

export function OpenAIStream(response: Response): ReadableStream<any> {
	if (!response.ok || !response.body) {
		throw new Error(`Failed to convert the response to stream. Received status code: ${response.status}.`);
	}

	const responseBodyStream = toPolyfillReadable(response.body);

	// @ts-expect-error bad types
	return toNativeReadable(responseBodyStream.pipeThrough(createEventStreamTransformer(parseOpenAIStream())));
}

Just use this OpenAIStream instead of the one returned from ai and also return this StreamingTextResponse from your action and it will work.

There were two key issues:

  • OpenAIStream uses pipeThrough, which does not work in remix without patching the polyfill that remix server uses
  • StreamingTextResponse does not include request.headers.raw(), which @vercel/remix uses when deployed on vercel

Using these two patched functions in place of the ai ones used in the examples makes everything work in vercel!

@MaxLeiter
Copy link
Member

MaxLeiter commented Jul 3, 2023

Hi all,

After some research / asking around, I believe this is a remix issue rather than an SDK one (but please let me know if you disagree!)

Basically Remix uses node-fetch and we're using the spec compliant fetch implementation. If you use the edge runtime on Vercel it should work:

export const config = { runtime: "edge" };

You shouldn't need to use the edge runtime though (even though we recommend it) so I'll look into the appropriate way to handle this. It seems like we'll either need to add polyfills to vercel/remix or submit something upstream.

@cephalization
Copy link
Author

It would be really funny if this just worked 😅

I tried this while the action was on the same route as the component and it did not work. Let me try with the action on a dedicated resource route

@cephalization
Copy link
Author

This does not work for local dev, which is a non starter for me unfortunately

@AvidDabbler
Copy link

This is the same for HuggingFaceStream. Would love for this to work.

@cephalization if you have any workarounds or end up using something else would love to know and would I can do the same for you 😄

@cephalization
Copy link
Author

My workaround reimplementation above works on local and hosted so I am happy with it for now. We will see if that changes when I try to upgrade the package in the future 😂

@MaxLeiter
Copy link
Member

@cephalization
Copy link
Author

@AvidDabbler copy paste the code from this comment into a new file, then import and use StreamingTextResponse from your new file.

To get huggingfacestream working, you can likely tweak the openaistream I have shown with whatever hugging face specific logic there is, if any. Checkout the source code from this repo and see what you can make happen

@AvidDabbler
Copy link

@AvidDabbler copy paste the code from this comment into a new file, then import and use StreamingTextResponse from your new file.

To get huggingfacestream working, you can likely tweak the openaistream I have shown with whatever hugging face specific logic there is, if any. Checkout the source code from this repo and see what you can make happen

IDK how i missed that 🤦

@yarapolana
Copy link

@cephalization Hey how is your client side?
I get Could not parse content as FormData.
And when I use the alternative await request.json()

I get a HTML stream as if you were doing a get request.

@cephalization
Copy link
Author

@yarapolana

My action looks just like it does at the beginning of this post. It is on its own route called routes/api.chat.ts and in the frontend code, that uses the useChat hook, I give it the api location of /api/chat

I am using ai@^2.1.7 if it helps

@yarapolana
Copy link

yarapolana commented Jul 14, 2023

@yarapolana

My action looks just like it does at the beginning of this post. It is on its own route called routes/api.chat.ts and in the frontend code, that uses the useChat hook, I give it the api location of /api/chat

I am using ai@^2.1.7 if it helps

Tried for two days to work around it, and downgraded ai lib but I still get the full html as response.
To clarify the messages from usechat give me
<!doctype html ...etc

and that html response does not include the actual streaming values.
just styling/meta/head etc

I am using ChakraUI

Im going to try this on a new project and see what happens.

Thanks for sharing though.

@cephalization
Copy link
Author

@yarapolana I can give you my code snippets.

Here is my usage of the chat hook

        const { messages, append, reload, stop, isLoading, input, setInput } = useChat({
		api: '/api/chat',
		headers: {
			'Content-Type': 'application/json',
		},
		initialMessages,
		id,
		body: {
			id,
			transcriptId: meeting.transcript.redisKey,
		},
	});

Here is my /api/chat route

import type { ActionArgs } from '@vercel/remix';
import type { ChatCompletionRequestMessage } from 'openai-edge';
import { Configuration, OpenAIApi } from 'openai-edge';
import { z } from 'zod';
import { countMessageTokens, optimizeTranscriptModel } from 'llm';

import { transcriptQueries, redis } from '@/server/kv.server';
import { OpenAIStream, StreamingTextResponse } from '@/server/streamingTextResponse.server';
import { checkAuth } from '@/server/auth.utils.server';

const oConfig = new Configuration({
	apiKey: process.env.OPENAI_API_KEY ?? '',
});
const openai = new OpenAIApi(oConfig);

export const action = async ({ request }: ActionArgs) => {
	checkAuth(request);

	const { messages, transcriptId } = z
		.object({
			messages: z.array(
				z
					.object({
						content: z.string(),
						role: z.union([z.literal('user'), z.literal('system'), z.literal('assistant')]),
					})
					.passthrough(),
			),
			transcriptId: z.string(),
		})
		.parse(await request.json());

	// Load transcript set from Redis, turn it into a string
	const transcriptArray = await transcriptQueries.getTranscriptArray(redis, { transcriptKey: transcriptId });
	const transcript = transcriptArray;
	const prompt = `HERE IS MY PROMPT`;
	const { model, shortenedTranscript } = optimizeTranscriptModel(transcript, {
		promptTokenBuffer: countMessageTokens(prompt),
	});
	const content = prompt.replace('{transcript}', shortenedTranscript.join('\n'));

	const newMessages: ChatCompletionRequestMessage[] = [
		{
			role: 'system',
			content,
		},
		...messages,
	];

	const response = await openai.createChatCompletion({
		model,
		messages: newMessages,
		stream: true,
	});
	const stream = OpenAIStream(response);

	const sResponse = new StreamingTextResponse(stream);

	return sResponse;
};

@yarapolana
Copy link

yarapolana commented Jul 14, 2023

@cephalization
Ah I am embarrassed right now, didn't think of using the action inside a specific api route, had it with all the rest of the page. Its working fine 😅

@cephalization
Copy link
Author

@yarapolana I ran into the same issue! very confusing. I think actions in a page route behave slightly differently than on their own route

@Klingefjord
Copy link

Hope this gets fixed soon!

@cephalization
Copy link
Author

I haven't tested this yet but the ServerNodePolyfillOptions added in https://github.com/remix-run/remix/releases/tag/remix%401.19.0 look promising

@cephalization
Copy link
Author

Nope, that option does not appear to do anything useful in local dev at least

@mindblight
Copy link

@cephalization I'm actually not quite sure how your earlier example works. I just upgraded to 1.19 to try turning of the polyfills - same issue :(.

The server-side polyfill seems to override the native ReadableStream. Here's what I'm seeing:

import {
  ReadableStream as PolyfillReadableStream,
  TransformStream as PolyfillTransformStream,
  WritableStream as PolyfillfWritableStream,
} from "web-streams-polyfill/ponyfill";
console.log("api.ask-question");

console.log("read", ReadableStream === PolyfillReadableStream); // true
console.log("transform", TransformStream === PolyfillTransformStream); // false
console.log("write", WritableStream === PolyfillfWritableStream); // true

One other weird thing. This works fine:

return new Response(response.body, {
  headers: {
    "Content-Type": "text/event-stream",
  },
});

This hangs:

return new Response(new ReadableStream(response.body), {
    headers: {
      "Content-Type": "text/event-stream",
    },
  });

I've tried playing around with direct imports too, since all supported node versions have an official implementation (import { ReadableStream} from 'node:stream/web'). No luck so far. But, that's also due to hanging. If I can figure out what's causing the polyfill to hang, that might translate to the official implementation too.

@cephalization
Copy link
Author

@mindblight note for my example to work, you need to reimplement OpenAIStream and StreamingTextResponse like I've done at the top of this issue.

This "fix" still works for the latest version of ai but you miss out on lots of fixes and features since my patch was written a while ago.

Hoping for some action on the remix side soon.

@mindblight
Copy link

@cephalization Ah, sorry - I was unclear. I'm not actually using the 'ai' library - I just use fetch to hit OpenAI's api directly for streaming. However, I am using remix, which means I'm running into the same polyfill issue. I got linked here from an issue in the remix repo, and you seem like you're the only person who's come up with an actual solution :p.

Rewriting it makes sense. Here's the line that's confusing to me:

// @ts-expect-error bad types
const toPolyfillReadable = createReadableStreamWrapper(PolyfillReadableStream);
const toNativeReadable = createReadableStreamWrapper(ReadableStream);

From what I can tell, remix globally injects PolyfillReadableStream and overrides ReadableStream (See above). I'm pretty sure they're the same object. Doing import { ReadableStream as NodeReadableStream } from 'node:streams/web' imports the actual stream implementation that was originally overridden.

I'd like to find a slightly more generic solution that doesn't require rewriting specific stream implementations. I've tried both new PolyfillReadableStream(response.body) and new NodeReadableStream(response.body). Both hang :/. I think that if I just write an implementation of ReadableStream that works with response.body, then we can just do new RemixStream(response.body).pipeThrough(...) like normal.

I thought NodeReadableStream would handle this, so I'm a little confused why that's not working as well :/. I'm gonna experiment with creating a RemixStream that implements NodeReadableStream, plus trying to override the injected polyfills. I'll let you know if I have any luck

@mindblight
Copy link

Alright, I figured something out:

// stream-compat.server.ts
import {
  ReadableStream as NodeReadableStream,
  TextDecoderStream as NodeTextDecoderStream,
  TextEncoderStream as NodeTextEncoderStream,
  TransformStream as NodeTransformStream,
  WritableStream as NodeWritableStream,
} from "node:stream/web";

async function* iterStream<T>(body?: ReadableStream<T> | null) {
  if (!body) {
    return;
  }
  const reader = body.getReader();

  while (true) {
    const r = await reader.read();
    if (r.done) {
      break;
    } else {
      yield r.value;
    }
  }
}

// Ensures that custom streams are compatible with node streams
(global as any).ReadableStream = NodeReadableStream;
(global as any).TransformStream = NodeTransformStream;
(global as any).WritableStream = NodeWritableStream;
(global as any).TextDecoderStream = NodeTextDecoderStream;
(global as any).TextEncoderStream = NodeTextEncoderStream;

export const toNodeReadableStream = <T>(
  polyStream: ReadableStream<T> | null | undefined
) => {
  const iterator = iterStream(polyStream);

  return new NodeReadableStream<T>({
    async pull(controller) {
      const r = await iterator.next();
      if (r.done) {
        controller.close();
      } else {
        controller.enqueue(r.value);
      }
    },
    // This is *mostly* correct. There are slight type issues between the two streams
  }) as ReadableStream<T>;
};

export const StreamCompat = {
  toReadable: toNodeReadableStream,
  // lib.dom.d.ts and web.d.ts typings are apparently incompatible.
  // Remix routes use web.d.ts and .server.ts files use lib.dom.d.ts
  // This exposes lib.dom.d.ts typings for the routes
  TextEncoderStream,
  TextDecoderStream,
};

In an action, you can now do

StreamCompat.toReadable(response.body).pipeThrough(...)

And everything should JustWork™️. In theory, you can now use the official ai stream rather than re-writing it :). I'm using TextDecoderStream with a number of custom streams to make handling the data easier, so I wanted to be able to use and define whatever I feel like.

I'm gonna cross-post on the issue in the remix-run. Hope this helps someone!

@cephalization
Copy link
Author

@mindblight will give this a try later, thanks! Was really hoping to avoid using my patch at work. This looks more maintainable

@yarapolana
Copy link

This is great stuff @mindblight. you and @cephalization seem to be "on the money".

Any way to tag the remix team here?

@Klingefjord
Copy link

Any news?

@MaxLeiter
Copy link
Member

MaxLeiter commented Aug 15, 2023

@Klingefjord have you tried something like #199 (comment)?

@mindblight
Copy link

@yarapolana I posted this in the remix issue that led me here. I haven't heard anything. The Remix 2 roadmap includes dropping polyfills by default, so I suspect that we won't get an official fix until that's released. Once the polyfills are gone, then this is no longer a problem

@giuseppeg
Copy link

giuseppeg commented Sep 25, 2023

Ok I figured out a better workaround.

The issue is with OpenAI using node-fetch or Remix using @remix-run/web-fetch which uses @remix-run/web-stream which uses a polyfilled version of ReadableStream which has a check on an internal polyfill property in pipeThrough.

When the OpenAI client returns a response, Vercel's AI passes a standard non-polyfilled TransformStream to responseBodyStream.pipeThrough() which runs some checks on it. Since the TransformStream is not polyfilled the checks don't pass.

The solution

Remove these polyfills in @remix-run/web-fetch remix-run/web-std-io#42

If the issue persists after the polyfills removal, OpenAI's package might also be causing it since they use node-fetch. In that case you can pass a custom fetch to the OpenAI client:

  const openai = new OpenAI({
    apiKey: process.env.apiKey,
    organization: process.env.organization,
    fetch: globalThis.fetch
  });

The temporary workaround

Since the problem is with TransformStream you can replace the global one with the polyfilled one and use a custom StreamingTextResponse that adds raw headers:

import OpenAI from "openai";
import { OpenAIStream } from "ai";

export const action = async ({ request }: ActionArgs) => {
  const { messages } = await request.json();
  const openai = new OpenAI({
    apiKey: process.env.apiKey,
    organization: process.env.organization,
  });

  // This is the fix
  const { TransformStream } = await import("web-streams-polyfill");
  globalThis.TransformStream = TransformStream;

  const response = await openai.chat.completions.create({
    stream: true,
    model: "gpt-3.5-turbo",
    temperature: 1,
    messages,
  });

  const stream = OpenAIStream(response);
  return new StreamingTextResponse(stream);
}

StreamingTextResponse:

import { StreamingTextResponse as _StreamingTextResponse } from "ai";

// vercel/ai's StreamingTextResponse does not include request.headers.raw()
// which @vercel/remix uses when deployed on vercel.
// Therefore we use a custom one.
export class StreamingTextResponse extends _StreamingTextResponse {
  constructor(res: ReadableStream, init?: ResponseInit) {
    super(res, init);
    this.getRequestHeaders();
  }

  getRequestHeaders() {
    return addRawHeaders(this.headers);
  }
}

const addRawHeaders = function addRawHeaders(headers: Headers) {
  // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  // @ts-ignore
  headers.raw = function () {
    const rawHeaders: { [k in string]: string[] } = {};
    const headerEntries = headers.entries();
    for (const [key, value] of headerEntries) {
      const headerKey = key.toLowerCase();
      // eslint-disable-next-line no-prototype-builtins
      if (rawHeaders.hasOwnProperty(headerKey)) {
        rawHeaders[headerKey].push(value);
      } else {
        rawHeaders[headerKey] = [value];
      }
    }
    return rawHeaders;
  };
  return headers;
};

@Klingefjord
Copy link

So Remix v2 is officially out now, wherein this is no longer an issue 🎉

@joelriveradev
Copy link

joelriveradev commented Mar 8, 2024

I don't think this issue is fixed because I'm also getting TypeError: nodeResponse.headers.raw is not a function when I return new StreamingTextResponse(OpenAIStream(response)) from an action inside a resource route in remix. I'm on remix version 2.7.2, and (vercel) ai version 2.2.37.

On localhost it works perfectly fine. The issue is when I deploy to vercel.

Screenshot 2024-03-08 at 3 38 30 PM

@Klingefjord
Copy link

I don't think this issue is fixed because I'm also getting TypeError: nodeResponse.headers.raw is not a function when I return new StreamingTextResponse(OpenAIStream(response)) from an action inside a resource route in remix. I'm on remix version 2.7.2, and (vercel) ai version 2.2.37.

On localhost it works perfectly fine. The issue is when I deploy to vercel.

Screenshot 2024-03-08 at 3 38 30 PM

Same for me. Might be a problem with Vercel adapter

@hzhu
Copy link

hzhu commented Jun 12, 2024

I'm pretty sure the root problem lies within the Vercel adapter. Specifically, here: packages/vercel-remix/server.ts#L105. What do you think @TooTallNate, should one of us create a PR?

EDIT: I was also getting the same error as @joelriveradev, but @MaxLeiter's solution worked for me! 🚀

@lgrammel
Copy link
Collaborator

Not sure if this helps, but the AI SDK uses the Node standard fetch.

Have you tried setting the following in vite.config.js?

installGlobals({ nativeFetch: true })

More info: https://remix.run/docs/en/main/guides/single-fetch

@rushil-patel
Copy link

rushil-patel commented Oct 1, 2024

I have a remix app running on vercel, and was running into this issue trying to use the streamText method. What finally ended up working for me is the following:

upgrade node to v20.x previously I was on v18.20.3
I guess the difference here is that in v18 fetch is still experimental
With just the above had my packages at:

  "@vercel/remix": "2.12.0",
  "ai": "3.4.7",
  "@remix-run/node": "^2.10.2",
  "@remix-run/react": "^2.10.2",
  "@remix-run/serve": "^2.10.2"

According to the remix 2.9.0 if you're on node 20.x you need should remove the installGlobals call in `vite.config.js, so I did.

At this point, when running remix vite:dev locally streaming was working but I would run into this TypeError: First parameter has member 'readable' that is not a ReadableStream. error when deployed to vercel.

What got it working on a vercel deploy was to provide the undici fetch to the openai provider, like this:

import { fetch } from "undici";
import { createOpenAI } from "@ai-sdk/openai";
import { streamText } from "ai";
....
const openAI = createOpenAI({
      fetch: fetch,
});
const result = await streamText({
     model: openAI('gpt-4o')
     .....
})
....

For anyone who is running into this issue, with streamText hope this helps

@anthonyringoet
Copy link

Can confirm the explicit passing of undici to createOpenAI fixed it. Only removing installGlobals did not fix it for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests