Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add c.stream() #1437

Merged
merged 26 commits into from
Sep 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f121e58
feat: implement stream api utility-class
sor4chi Sep 11, 2023
93a033a
test: write the test of StreamApi
sor4chi Sep 11, 2023
490853e
feat: implement `c.stream` to context
sor4chi Sep 11, 2023
b867400
test: write the test of `c.stream()`
sor4chi Sep 11, 2023
421e801
chore: denoify
sor4chi Sep 11, 2023
38b4cf5
fix: extend for bytes, remove buffer system, add pipe and log interface
sor4chi Sep 13, 2023
19d6549
test: update test about log, pipe, etc... for streaming API
sor4chi Sep 13, 2023
f757c3f
feat: extend textStream interface, remove utf-8 content-type
sor4chi Sep 13, 2023
f653f5c
test: add test about `c.textStream`
sor4chi Sep 13, 2023
1cfb214
refactor: update some args name
sor4chi Sep 13, 2023
3ee4779
chore: denoify
sor4chi Sep 13, 2023
e50e303
fix: for deno, removed the optional parameter of `write` and `writeln`
sor4chi Sep 13, 2023
4ea0e77
chore: denoify
sor4chi Sep 13, 2023
0ef0232
feat: add charset for textStream content-type header
sor4chi Sep 15, 2023
4dd25e1
fix: rename textStream to streamText
sor4chi Sep 15, 2023
4970d7f
fix: reuse stream in streamText for bundle size
sor4chi Sep 15, 2023
00b854f
feat: add `stream.wait()` api
sor4chi Sep 15, 2023
94d85b4
chore: denoify
sor4chi Sep 15, 2023
f73c560
fix: rename `stream.wait` to `stream.sleep`
sor4chi Sep 16, 2023
6e14dae
test: use `stream.sleep` for waiting
sor4chi Sep 16, 2023
add0f48
refactor: remove `stream.log`
sor4chi Sep 16, 2023
7443350
fix: remove preHeader from `c.stream()` and use `transfer-encoding` o…
sor4chi Sep 16, 2023
1ff9550
chore: denoify
sor4chi Sep 16, 2023
76395f2
refactoring: remove preHeader initialize
sor4chi Sep 16, 2023
ab18e15
test: reduce sleep duration
sor4chi Sep 16, 2023
7eff33b
chore: denoify
sor4chi Sep 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions deno_dist/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Env, NotFoundHandler, Input, TypedResponse } from './types.ts'
import type { CookieOptions } from './utils/cookie.ts'
import { serialize } from './utils/cookie.ts'
import type { StatusCode } from './utils/http-status.ts'
import { StreamingApi } from './utils/stream.ts'
import type { JSONValue, InterfaceToType } from './utils/types.ts'

type Runtime = 'node' | 'deno' | 'bun' | 'workerd' | 'fastly' | 'edge-light' | 'lagon' | 'other'
Expand Down Expand Up @@ -335,6 +336,33 @@ export class Context<
return this.newResponse(null, status)
}

streamText = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
headers?: HeaderRecord
): Response => {
headers ??= {}
headers['content-type'] = 'text/plain; charset=UTF-8'
headers['x-content-type-options'] = 'nosniff'
headers['transfer-encoding'] = 'chunked'

return this.stream(cb, arg, headers)
}

stream = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
headers?: HeaderRecord
): Response => {
const { readable, writable } = new TransformStream()
const stream = new StreamingApi(writable)
cb(stream).finally(() => stream.close())

return typeof arg === 'number'
? this.newResponse(readable, arg, headers)
: this.newResponse(readable, arg)
}

/** @deprecated
* Use Cookie Middleware instead of `c.cookie()`. The `c.cookie()` will be removed in v4.
*
Expand Down
38 changes: 38 additions & 0 deletions deno_dist/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
export class StreamingApi {
private writer: WritableStreamDefaultWriter<Uint8Array>
private encoder: TextEncoder
private writable: WritableStream

constructor(writable: WritableStream) {
this.writable = writable
this.writer = writable.getWriter()
this.encoder = new TextEncoder()
}

async write(input: Uint8Array | string) {
if (typeof input === 'string') {
input = this.encoder.encode(input)
}
await this.writer.write(input)
return this
}

async writeln(input: string) {
await this.write(input + '\n')
return this
}

yusukebe marked this conversation as resolved.
Show resolved Hide resolved
sleep(ms: number) {
return new Promise((res) => setTimeout(res, ms))
}

async close() {
await this.writer.close()
}

async pipe(body: ReadableStream) {
this.writer.releaseLock()
await body.pipeTo(this.writable, { preventClose: true })
this.writer = this.writable.getWriter()
}
}
35 changes: 35 additions & 0 deletions src/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,39 @@ describe('Pass a ResponseInit to respond methods', () => {
expect(res.headers.get('content-type')).toMatch(/^text\/html/)
expect(await res.text()).toBe('<h1>foo</h1>')
})

it('c.streamText()', async () => {
const res = c.streamText(async (stream) => {
for (let i = 0; i < 3; i++) {
await stream.write(`${i}`)
await stream.sleep(1)
}
})
if (!res.body) {
throw new Error('Body is null')
}
const reader = res.body.getReader()
const decoder = new TextDecoder()
for (let i = 0; i < 3; i++) {
const { value } = await reader.read()
expect(decoder.decode(value)).toEqual(`${i}`)
}
})

it('c.stream()', async () => {
const res = c.stream(async (stream) => {
for (let i = 0; i < 3; i++) {
await stream.write(new Uint8Array([i]))
await stream.sleep(1)
}
})
if (!res.body) {
throw new Error('Body is null')
}
const reader = res.body.getReader()
for (let i = 0; i < 3; i++) {
const { value } = await reader.read()
expect(value).toEqual(new Uint8Array([i]))
}
})
})
28 changes: 28 additions & 0 deletions src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Env, NotFoundHandler, Input, TypedResponse } from './types'
import type { CookieOptions } from './utils/cookie'
import { serialize } from './utils/cookie'
import type { StatusCode } from './utils/http-status'
import { StreamingApi } from './utils/stream'
import type { JSONValue, InterfaceToType } from './utils/types'

type Runtime = 'node' | 'deno' | 'bun' | 'workerd' | 'fastly' | 'edge-light' | 'lagon' | 'other'
Expand Down Expand Up @@ -335,6 +336,33 @@ export class Context<
return this.newResponse(null, status)
}

streamText = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
headers?: HeaderRecord
): Response => {
headers ??= {}
headers['content-type'] = 'text/plain; charset=UTF-8'
headers['x-content-type-options'] = 'nosniff'
headers['transfer-encoding'] = 'chunked'

return this.stream(cb, arg, headers)
}

stream = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
headers?: HeaderRecord
): Response => {
const { readable, writable } = new TransformStream()
const stream = new StreamingApi(writable)
cb(stream).finally(() => stream.close())

return typeof arg === 'number'
? this.newResponse(readable, arg, headers)
: this.newResponse(readable, arg)
}

/** @deprecated
* Use Cookie Middleware instead of `c.cookie()`. The `c.cookie()` will be removed in v4.
*
Expand Down
68 changes: 68 additions & 0 deletions src/utils/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { StreamingApi } from './stream'

describe('StreamingApi', () => {
it('write(string)', async () => {
const { readable, writable } = new TransformStream()
const api = new StreamingApi(writable)
const reader = readable.getReader()
api.write('foo')
expect((await reader.read()).value).toEqual(new TextEncoder().encode('foo'))
api.write('bar')
expect((await reader.read()).value).toEqual(new TextEncoder().encode('bar'))
})

it('write(Uint8Array)', async () => {
const { readable, writable } = new TransformStream()
const api = new StreamingApi(writable)
const reader = readable.getReader()
api.write(new Uint8Array([1, 2, 3]))
expect((await reader.read()).value).toEqual(new Uint8Array([1, 2, 3]))
api.write(new Uint8Array([4, 5, 6]))
expect((await reader.read()).value).toEqual(new Uint8Array([4, 5, 6]))
})

it('writeln(string)', async () => {
const { readable, writable } = new TransformStream()
const api = new StreamingApi(writable)
const reader = readable.getReader()
api.writeln('foo')
expect((await reader.read()).value).toEqual(new TextEncoder().encode('foo\n'))
api.writeln('bar')
expect((await reader.read()).value).toEqual(new TextEncoder().encode('bar\n'))
})

it('pipe()', async () => {
const { readable: senderReadable, writable: senderWritable } = new TransformStream()

// send data to readable in other scope
;(async () => {
const writer = senderWritable.getWriter()
await writer.write(new TextEncoder().encode('foo'))
await writer.write(new TextEncoder().encode('bar'))
// await writer.close()
})()

const { readable: receiverReadable, writable: receiverWritable } = new TransformStream()

const api = new StreamingApi(receiverWritable)

// pipe readable to api in other scope
;(async () => {
await api.pipe(senderReadable)
})()

// read data from api
const reader = receiverReadable.getReader()
expect((await reader.read()).value).toEqual(new TextEncoder().encode('foo'))
expect((await reader.read()).value).toEqual(new TextEncoder().encode('bar'))
})

it('close()', async () => {
const { readable, writable } = new TransformStream()
const api = new StreamingApi(writable)
const reader = readable.getReader()
await api.close()
expect((await reader.read()).done).toBe(true)
await expect(api.write('foo')).rejects.toThrow()
})
})
38 changes: 38 additions & 0 deletions src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
export class StreamingApi {
private writer: WritableStreamDefaultWriter<Uint8Array>
private encoder: TextEncoder
private writable: WritableStream

constructor(writable: WritableStream) {
this.writable = writable
this.writer = writable.getWriter()
this.encoder = new TextEncoder()
}

async write(input: Uint8Array | string) {
if (typeof input === 'string') {
input = this.encoder.encode(input)
}
await this.writer.write(input)
return this
}

async writeln(input: string) {
await this.write(input + '\n')
return this
}

sleep(ms: number) {
return new Promise((res) => setTimeout(res, ms))
}

async close() {
await this.writer.close()
}

async pipe(body: ReadableStream) {
this.writer.releaseLock()
await body.pipeTo(this.writable, { preventClose: true })
this.writer = this.writable.getWriter()
}
}