Skip to content

Commit

Permalink
feat: Add c.stream() (#1437)
Browse files Browse the repository at this point in the history
* feat: implement stream api utility-class

* test: write the test of StreamApi

* feat: implement `c.stream` to context

* test: write the test of `c.stream()`

* chore: denoify

* fix: extend for bytes, remove buffer system, add pipe and log interface

* test: update test about log, pipe, etc... for streaming API

* feat: extend textStream interface, remove utf-8 content-type

* test: add test about `c.textStream`

* refactor: update some args name

* chore: denoify

* fix: for deno, removed the optional parameter of `write` and `writeln`

* chore: denoify

* feat: add charset for textStream content-type header

* fix: rename textStream to streamText

* fix: reuse stream in streamText for bundle size

* feat: add `stream.wait()` api

* chore: denoify

* fix: rename `stream.wait` to `stream.sleep`

* test: use `stream.sleep` for waiting

* refactor: remove `stream.log`

* fix: remove preHeader from `c.stream()` and use `transfer-encoding` only `c.streamText()`

* chore: denoify

* refactoring: remove preHeader initialize

* test: reduce sleep duration

* chore: denoify

Co-authored-by: Glen Maddern <glenmaddern@gmail.com>
  • Loading branch information
sor4chi and geelen authored Sep 16, 2023
1 parent e4c4322 commit db1edcb
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 0 deletions.
28 changes: 28 additions & 0 deletions deno_dist/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Env, NotFoundHandler, Input, TypedResponse } from './types.ts'
import type { CookieOptions } from './utils/cookie.ts'
import { serialize } from './utils/cookie.ts'
import type { StatusCode } from './utils/http-status.ts'
import { StreamingApi } from './utils/stream.ts'
import type { JSONValue, InterfaceToType } from './utils/types.ts'

type Runtime = 'node' | 'deno' | 'bun' | 'workerd' | 'fastly' | 'edge-light' | 'lagon' | 'other'
Expand Down Expand Up @@ -371,6 +372,33 @@ export class Context<
return this.newResponse(null, status)
}

streamText = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
headers?: HeaderRecord
): Response => {
headers ??= {}
headers['content-type'] = 'text/plain; charset=UTF-8'
headers['x-content-type-options'] = 'nosniff'
headers['transfer-encoding'] = 'chunked'

return this.stream(cb, arg, headers)
}

stream = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
headers?: HeaderRecord
): Response => {
const { readable, writable } = new TransformStream()
const stream = new StreamingApi(writable)
cb(stream).finally(() => stream.close())

return typeof arg === 'number'
? this.newResponse(readable, arg, headers)
: this.newResponse(readable, arg)
}

/** @deprecated
* Use Cookie Middleware instead of `c.cookie()`. The `c.cookie()` will be removed in v4.
*
Expand Down
38 changes: 38 additions & 0 deletions deno_dist/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
export class StreamingApi {
private writer: WritableStreamDefaultWriter<Uint8Array>
private encoder: TextEncoder
private writable: WritableStream

constructor(writable: WritableStream) {
this.writable = writable
this.writer = writable.getWriter()
this.encoder = new TextEncoder()
}

async write(input: Uint8Array | string) {
if (typeof input === 'string') {
input = this.encoder.encode(input)
}
await this.writer.write(input)
return this
}

async writeln(input: string) {
await this.write(input + '\n')
return this
}

sleep(ms: number) {
return new Promise((res) => setTimeout(res, ms))
}

async close() {
await this.writer.close()
}

async pipe(body: ReadableStream) {
this.writer.releaseLock()
await body.pipeTo(this.writable, { preventClose: true })
this.writer = this.writable.getWriter()
}
}
35 changes: 35 additions & 0 deletions src/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,41 @@ describe('Pass a ResponseInit to respond methods', () => {
expect(res.headers.get('content-type')).toMatch(/^text\/html/)
expect(await res.text()).toBe('<h1>foo</h1>')
})

it('c.streamText()', async () => {
const res = c.streamText(async (stream) => {
for (let i = 0; i < 3; i++) {
await stream.write(`${i}`)
await stream.sleep(1)
}
})
if (!res.body) {
throw new Error('Body is null')
}
const reader = res.body.getReader()
const decoder = new TextDecoder()
for (let i = 0; i < 3; i++) {
const { value } = await reader.read()
expect(decoder.decode(value)).toEqual(`${i}`)
}
})

it('c.stream()', async () => {
const res = c.stream(async (stream) => {
for (let i = 0; i < 3; i++) {
await stream.write(new Uint8Array([i]))
await stream.sleep(1)
}
})
if (!res.body) {
throw new Error('Body is null')
}
const reader = res.body.getReader()
for (let i = 0; i < 3; i++) {
const { value } = await reader.read()
expect(value).toEqual(new Uint8Array([i]))
}
})
})

declare module './context' {
Expand Down
28 changes: 28 additions & 0 deletions src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Env, NotFoundHandler, Input, TypedResponse } from './types'
import type { CookieOptions } from './utils/cookie'
import { serialize } from './utils/cookie'
import type { StatusCode } from './utils/http-status'
import { StreamingApi } from './utils/stream'
import type { JSONValue, InterfaceToType } from './utils/types'

type Runtime = 'node' | 'deno' | 'bun' | 'workerd' | 'fastly' | 'edge-light' | 'lagon' | 'other'
Expand Down Expand Up @@ -371,6 +372,33 @@ export class Context<
return this.newResponse(null, status)
}

streamText = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
headers?: HeaderRecord
): Response => {
headers ??= {}
headers['content-type'] = 'text/plain; charset=UTF-8'
headers['x-content-type-options'] = 'nosniff'
headers['transfer-encoding'] = 'chunked'

return this.stream(cb, arg, headers)
}

stream = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
headers?: HeaderRecord
): Response => {
const { readable, writable } = new TransformStream()
const stream = new StreamingApi(writable)
cb(stream).finally(() => stream.close())

return typeof arg === 'number'
? this.newResponse(readable, arg, headers)
: this.newResponse(readable, arg)
}

/** @deprecated
* Use Cookie Middleware instead of `c.cookie()`. The `c.cookie()` will be removed in v4.
*
Expand Down
68 changes: 68 additions & 0 deletions src/utils/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { StreamingApi } from './stream'

describe('StreamingApi', () => {
it('write(string)', async () => {
const { readable, writable } = new TransformStream()
const api = new StreamingApi(writable)
const reader = readable.getReader()
api.write('foo')
expect((await reader.read()).value).toEqual(new TextEncoder().encode('foo'))
api.write('bar')
expect((await reader.read()).value).toEqual(new TextEncoder().encode('bar'))
})

it('write(Uint8Array)', async () => {
const { readable, writable } = new TransformStream()
const api = new StreamingApi(writable)
const reader = readable.getReader()
api.write(new Uint8Array([1, 2, 3]))
expect((await reader.read()).value).toEqual(new Uint8Array([1, 2, 3]))
api.write(new Uint8Array([4, 5, 6]))
expect((await reader.read()).value).toEqual(new Uint8Array([4, 5, 6]))
})

it('writeln(string)', async () => {
const { readable, writable } = new TransformStream()
const api = new StreamingApi(writable)
const reader = readable.getReader()
api.writeln('foo')
expect((await reader.read()).value).toEqual(new TextEncoder().encode('foo\n'))
api.writeln('bar')
expect((await reader.read()).value).toEqual(new TextEncoder().encode('bar\n'))
})

it('pipe()', async () => {
const { readable: senderReadable, writable: senderWritable } = new TransformStream()

// send data to readable in other scope
;(async () => {
const writer = senderWritable.getWriter()
await writer.write(new TextEncoder().encode('foo'))
await writer.write(new TextEncoder().encode('bar'))
// await writer.close()
})()

const { readable: receiverReadable, writable: receiverWritable } = new TransformStream()

const api = new StreamingApi(receiverWritable)

// pipe readable to api in other scope
;(async () => {
await api.pipe(senderReadable)
})()

// read data from api
const reader = receiverReadable.getReader()
expect((await reader.read()).value).toEqual(new TextEncoder().encode('foo'))
expect((await reader.read()).value).toEqual(new TextEncoder().encode('bar'))
})

it('close()', async () => {
const { readable, writable } = new TransformStream()
const api = new StreamingApi(writable)
const reader = readable.getReader()
await api.close()
expect((await reader.read()).done).toBe(true)
await expect(api.write('foo')).rejects.toThrow()
})
})
38 changes: 38 additions & 0 deletions src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
export class StreamingApi {
private writer: WritableStreamDefaultWriter<Uint8Array>
private encoder: TextEncoder
private writable: WritableStream

constructor(writable: WritableStream) {
this.writable = writable
this.writer = writable.getWriter()
this.encoder = new TextEncoder()
}

async write(input: Uint8Array | string) {
if (typeof input === 'string') {
input = this.encoder.encode(input)
}
await this.writer.write(input)
return this
}

async writeln(input: string) {
await this.write(input + '\n')
return this
}

sleep(ms: number) {
return new Promise((res) => setTimeout(res, ms))
}

async close() {
await this.writer.close()
}

async pipe(body: ReadableStream) {
this.writer.releaseLock()
await body.pipeTo(this.writable, { preventClose: true })
this.writer = this.writable.getWriter()
}
}

0 comments on commit db1edcb

Please sign in to comment.