Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add c.stream() #1437

Merged
merged 26 commits into from
Sep 16, 2023
Merged

feat: Add c.stream() #1437

merged 26 commits into from
Sep 16, 2023

Conversation

sor4chi
Copy link
Contributor

@sor4chi sor4chi commented Sep 11, 2023

Hi, @yusukebe, @ geelen

I implemented c.stream() according to #914. I'd like to see @ geelen added as a co-author as well, since I'm pretty much quoting the code from the issue.

This is the first time I've use a StreamAPI, so feel free to point out any mistakes.

@yusukebe asked me to give some examples of using c.stream().
Below are some examples.

Usecase

1: ChatGPT Proxy

Enable rate limiting to protect or hide external APIs

Code
import OpenAI from 'openai'
import { Hono } from 'hono'

const app = new Hono<{
  Bindings: {
    OPENAI_API_KEY: string
  }
}>()

const PROMPT = (message: string) => [
  {
    role: 'system' as const,
    content:
      'You are an Web developer. If you receive a question about Web-related technologies, you can answer it.',
  },
  {
    role: 'user' as const,
    content: message,
  },
]

app.post('/', async (c) => {
  const body = await c.req.json()
  const openai = new OpenAI({ apiKey: c.env.OPENAI_API_KEY })

  return c.stream(async (stream) => {
    const chatStream = await openai.chat.completions.create({
      messages: PROMPT(body.message),
      model: 'gpt-3.5-turbo',
      stream: true,
    })

    for await (const message of chatStream) {
      stream.write(message.choices[0]?.delta.content || '').flush()
    }
  })
})

export default app

2: Read some large files using stream

In this example, I'm reading a large file from the disk and sending it to the client using stream.
eg: Read Geojson using stream and gradually draw

Code
import * as fs from 'fs'
import { serve } from '@hono/node-server'
import { Hono } from 'hono'

const app = new Hono()
app.get('/assets/:key', async (c) => {
  const key = c.req.param('key')
  const path = `./assets/${key}`
  const stream = fs.createReadStream(path)
  return c.stream(async (s) => {
    for await (const chunk of stream) {
      s.write(chunk).flush()
    }
  })
})

serve(app)

Author should do the followings, if applicable

  • Add tests
  • Run tests
  • yarn denoify to generate files for Deno

@geelen
Copy link
Contributor

geelen commented Sep 11, 2023

Hey @sor4chi thanks for taking this up! I'd added a few more convenience methods to my own StreamingApi implementation, which you might be interested in including:

// Lets you easily emit data in JSON lines format
log(obj: any) {
  return this.writeln(JSON.stringify(obj)).flush();
}

// Lets you send a ReadableStream directly, e.g. from a subrequest
async pipe(body: ReadableStream) {
  this.writer.releaseLock();
  await body.pipeTo(this.writable, { preventClose: true });
  this.writer = this.writable.getWriter();
}

My only thought is whether this should be called c.textStream or something, since everything is based around strings so far? But I'll leave that up to @yusukebe to decide. Maybe this implementation can be made generic to allow both strings and binary data? Afaik, it's just the TextEncoder and the "content-type": "text/plain" header that makes it string-y. Thoughts?

}

write(str: string): StreamingApi {
this.buffer.push(str)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to buffer here into memory or just write directly to the stream? If the latter, I don't think you'd even need .flush() at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geelen, thanks for your review.
I found a flush function in @geelen 's reference implementation, so I implemented this to create a buffer until then so that the stream is written at the timing when the flush actually occurs.
Surely, I think it is unnecessary because the user can buffer the stream before writing to it.
Would it be OK to send it when write or writeln is called?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was testing something at the time with Cloudflare Workers and found that it was only streaming in 4096 byte chunks, so I made flush send a bunch of bytes of whitespace to force it to send 🤫

I think I was doing something wrong, though, since when I tried again, chunks were streamed much more rapidly so I think it's fine to send them as soon as write is called. A user can always buffer on their own side before sending it...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I'll fix this in the next commit!

@yusukebe
Copy link
Member

Hi @sor4chi!

Awesome! This will be set to be a highlight feature for our next minor update, v3.7.0.

I haven't looked closely the implementation yet, but my primary concerns are:

  • Naming: As @geelen's mentioned, c.textStream() might be better if it supports only "content-type": "text/plain".
  • Focus: If there's potential to make this feature more general – like creating a stream that supports binary data – it would be great. In that case, we could name it c.stream().

I'll be reviewing your implementation more closely later. Either way, this is an important feature, so take our time making it.

@sor4chi
Copy link
Contributor Author

sor4chi commented Sep 13, 2023

Hello @geelen, @yusukebe.
It took me a while, but I changed the interface based on your suggestion.
How do you like it?

@yusukebe
Copy link
Member

yusukebe commented Sep 14, 2023

@sor4chi

Great job! There are two points I'd like us to consider:

streamText()

While c.stream() can handle all cases, it's not necessity for c.textStream(). But, it could be useful, because when we often use text/plain for debugging. I think naming it c.streamText() might be more consistent, given the prefix matches c.stream().

And if you create a c.streamText(), you can do it without redundant:

  streamText = (
    cb: (stream: StreamingApi) => Promise<void>,
    arg?: StatusCode | ResponseInit,
    headers?: HeaderRecord
  ): Response => {
    headers = headers ?? {}
    headers['content-type'] = 'text/plain; charset=UTF-8'
    headers['x-content-type-options'] = 'nosniff'
    return this.stream(cb, arg ?? 200, headers)
  }

Note: Including x-content-type-options: nosniff would be beneficial when accessing via a web browser.

stream.wait()

Though it is also not necessary, stream.wait() could be helpful:

export class StreamingApi {
  //...
  wait(ms: number) {
    return new Promise((resolve) => setTimeout(resolve, ms))
  }
}

Sample

Combining all features, the code looks much cleaner:

app.get('/', (c) => {
  return c.streamText(async (stream) => {
    while (true) {
      const message = `It is ${new Date().toISOString()}`
      stream.writeln(message)
      await stream.wait(1000)
    }
  })
})

Cool!

@sor4chi
Copy link
Contributor Author

sor4chi commented Sep 14, 2023

@yusukebe
Thank you for pointing this out.
It certainly seems like a good idea to align stream/streamText and prefix from a completion perspective.
As for the wait method, it would certainly be useful.

Is it like leaving it up to the user whether to add transfer-encoding: chunked or not?
Also, should I put back charset=UTF-8?

@yusukebe
Copy link
Member

@sor4chi

Is it like leaving it up to the user whether to add transfer-encoding: chunked or not?

Yes, we have to keep it because I think it should be chunked for a streaming response.

should I put back charset=UTF-8?

I've suggested that because of the compatibility for c.text():

this._pH['content-type'] = 'text/plain; charset=UTF-8'

I'm planning making 'text/plain; charset=UTF-8' as a variable to reduce the bundle size, but I'll do that later.

deno_dist/utils/stream.ts Outdated Show resolved Hide resolved
@yusukebe
Copy link
Member

Ah, we can refer to the Vercel's AI sdk: https://sdk.vercel.ai/docs

@arnaudbzn
Copy link

arnaudbzn commented Sep 15, 2023

Back-pressure and cancellation management (detailed in Vercel AI SDK here).

A minimalist code I'm using now with Hono and Server Sent Event message format

const completionStream = await openai.chat.completions.create(....)
const completionStreamIterator = completionStream[Symbol.asyncIterator]();

const eventStream = new ReadableStream({
  // Client asks for more data
  async pull(controller) {
      const { done, value } = await completionStreamIterator.next();
      
      // Nothing more from OpenAI
      if (done) {
       // Close client connection, we could send a "finish" SSE custom event before closing
        controller.close()
      }
      controller.enqueue(`data: ${value.choices[0]?.delta?.content || ""}\n\n`);
  },
  // Client aborts the connection
  async cancel(reason) {
      // Cancel OpenAI upstream
      await completionStreamIterator.return?.(reason);
  }
}).pipeThrough(new TextEncoderStream());
 
// Respond with the stream and SSE required headers
return new Response(eventStream, {
  headers: {
    ...corsHeaders,
    "Content-Type": "text/event-stream; charset=utf-8",
    "Cache-Control": "no-store",
    Connection: "keep-alive",
  },
});
  

@arnaudbzn
Copy link

@yusukebe When testing my openai proxy (not using hono c.stream) with CloudFlare functions I get more than 100ms CPU time according to the completion length form OpenAI (so way above the 50ms CPU time limit).

What CPU time you get with the c.stream based solution?

@arnaudbzn
Copy link

arnaudbzn commented Sep 15, 2023

About SSE and cancellation with bun there is this issue:
Bun does not support http2 right now and the cancellation management is a bit tricky:
oven-sh/bun#2443
oven-sh/bun#3390

@watany-dev
Copy link
Contributor

Similarly, it appears that AWS Lambda's response streaming only supports HTTP1.1 as well.

You can find more information at

It's a challenging question to decide how generic to make the system, but I think it would be good to have options in the roadmap.

@arnaudbzn
Copy link

arnaudbzn commented Sep 15, 2023

There is no issues with HTTP1.1, only the limit of 6 parallel SSE connections per domain. The code stays the same.
The only JS runtime that requires a specific request cancellation management is bun for the moment:

async pull(controller) {
   // with bun we have to check if the request is cancelled here
   if (req.signal.aborted) {
          controller.close();
          // Cancel OpenAI upstream
          await completionStreamIterator.return?.(reason);
          return;
    }
    // ...
 }

@watany-dev
Copy link
Contributor

I see...if HTTP 1.1 support is only for AWS lambda. There is no need to support it in this commit. There are other complicated differences, so I will absorb them with adaptor.

@yusukebe
Copy link
Member

@arnaudbzn

When testing my openai proxy (not using hono c.stream) with CloudFlare functions I get more than 100ms CPU time according to the completion length form OpenAI (so way above the 50ms CPU time limit).

What CPU time you get with the c.stream based solution?

I think c.stream() doesn't resolve the problem, but when I tried a ChatGPT streaming gateway on Cloudflare Pages, the CPU time is not so long. It works well.

@yusukebe
Copy link
Member

yusukebe commented Sep 16, 2023

Hi @arnaudbzn @watany-dev

Thank you for your comments.

c.stream() is not dependent of HTTP version. With HTTP/1.1 on Wrangler, Node.js, Deno, and Bun, it will be chunked responses. In the case of Cloudflare, With HTTP/2 or H3, the transfer-encoding header with chunked set will be removed and the response will be kept as streaming. We should not care about protocol versions in Hono implementation.

About SSE. SSE is not supported in c.stream(). If you want to use SSE, we don't know if we will adopt it, but you can create a helper to make it happen.

So what I'm saying is that c.stream() should be a simple function to output HTTP streaming responses. If you want to do something else, you can create your own ReadableStream.

@sor4chi

So let's build on the current implementation!

src/utils/stream.ts Outdated Show resolved Hide resolved
deno_dist/utils/stream.ts Outdated Show resolved Hide resolved
src/context.ts Outdated Show resolved Hide resolved
Copy link
Member

@yusukebe yusukebe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sor4chi

Almost done! See the comments.

deno_dist/context.ts Outdated Show resolved Hide resolved
src/context.test.ts Outdated Show resolved Hide resolved
src/context.test.ts Outdated Show resolved Hide resolved
@sor4chi
Copy link
Contributor Author

sor4chi commented Sep 16, 2023

@yusukebe
I'm done!

@yusukebe yusukebe changed the title Add c.stream() feat: Add c.stream() Sep 16, 2023
@yusukebe
Copy link
Member

Hey!

Now, I'll merge it into the next branch which will be released as a next minor version v3.7.0.

Awesome work, @sor4chi ! Thanks!

@yusukebe yusukebe changed the base branch from main to next September 16, 2023 14:27
@yusukebe yusukebe merged commit db1edcb into honojs:next Sep 16, 2023
yusukebe pushed a commit that referenced this pull request Sep 16, 2023
* feat: implement stream api utility-class

* test: write the test of StreamApi

* feat: implement `c.stream` to context

* test: write the test of `c.stream()`

* chore: denoify

* fix: extend for bytes, remove buffer system, add pipe and log interface

* test: update test about log, pipe, etc... for streaming API

* feat: extend textStream interface, remove utf-8 content-type

* test: add test about `c.textStream`

* refactor: update some args name

* chore: denoify

* fix: for deno, removed the optional parameter of `write` and `writeln`

* chore: denoify

* feat: add charset for textStream content-type header

* fix: rename textStream to streamText

* fix: reuse stream in streamText for bundle size

* feat: add `stream.wait()` api

* chore: denoify

* fix: rename `stream.wait` to `stream.sleep`

* test: use `stream.sleep` for waiting

* refactor: remove `stream.log`

* fix: remove preHeader from `c.stream()` and use `transfer-encoding` only `c.streamText()`

* chore: denoify

* refactoring: remove preHeader initialize

* test: reduce sleep duration

* chore: denoify

Co-authored-by: Glen Maddern <glenmaddern@gmail.com>
yusukebe pushed a commit that referenced this pull request Sep 20, 2023
* feat: implement stream api utility-class

* test: write the test of StreamApi

* feat: implement `c.stream` to context

* test: write the test of `c.stream()`

* chore: denoify

* fix: extend for bytes, remove buffer system, add pipe and log interface

* test: update test about log, pipe, etc... for streaming API

* feat: extend textStream interface, remove utf-8 content-type

* test: add test about `c.textStream`

* refactor: update some args name

* chore: denoify

* fix: for deno, removed the optional parameter of `write` and `writeln`

* chore: denoify

* feat: add charset for textStream content-type header

* fix: rename textStream to streamText

* fix: reuse stream in streamText for bundle size

* feat: add `stream.wait()` api

* chore: denoify

* fix: rename `stream.wait` to `stream.sleep`

* test: use `stream.sleep` for waiting

* refactor: remove `stream.log`

* fix: remove preHeader from `c.stream()` and use `transfer-encoding` only `c.streamText()`

* chore: denoify

* refactoring: remove preHeader initialize

* test: reduce sleep duration

* chore: denoify

Co-authored-by: Glen Maddern <glenmaddern@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants