diff --git a/src/json-rpc/json-rpc-client.ts b/src/json-rpc/json-rpc-client.ts index 6addb0a..2514706 100644 --- a/src/json-rpc/json-rpc-client.ts +++ b/src/json-rpc/json-rpc-client.ts @@ -110,9 +110,9 @@ export class JsonRpcClient> { } } - toStreamPair(): { input: WritableStream, output: ReadableStream } { - const input = new WritableStream({ write: message => this.accept(message) }); - const output = new ReadableStream({ start: controller => this.onrequest = message => controller.enqueue(message) }); - return { input, output }; + toStream(): ReadableWritablePair { + const readable = new ReadableStream({ start: controller => this.onrequest = message => controller.enqueue(message) }); + const writable = new WritableStream({ write: message => this.accept(message) }); + return { readable, writable }; } } diff --git a/src/json-rpc/json-rpc-dual-engine.ts b/src/json-rpc/json-rpc-dual-engine.ts index f606040..d8e9421 100644 --- a/src/json-rpc/json-rpc-dual-engine.ts +++ b/src/json-rpc/json-rpc-dual-engine.ts @@ -27,7 +27,7 @@ export class JsonRpcDualEngine { } } - toStream(): TransformStream { + toStream(): ReadableWritablePair { return new TransformStream({ start: controller => this.onmessage = response => controller.enqueue(response), transform: message => this.accept(message), diff --git a/src/json-rpc/json-rpc-server.test.ts b/src/json-rpc/json-rpc-server.test.ts index 1756949..cfcad5e 100644 --- a/src/json-rpc/json-rpc-server.test.ts +++ b/src/json-rpc/json-rpc-server.test.ts @@ -1,4 +1,4 @@ -import { expect} from 'expect'; +import { expect } from 'expect'; import { beforeEach, describe, it, mock } from 'node:test'; import { JsonRpcServer } from './json-rpc-server.js'; @@ -94,4 +94,29 @@ describe(JsonRpcServer.name, () => { expect(parsed).toEqual({ jsonrpc: '2.0', result: 'pong', id: 1 }); }); + + it('should work as a piped stream', async () => { + const input = new ReadableStream({ + start(controller) { + controller.enqueue(JSON.stringify({ jsonrpc: '2.0', method: 'ping', id: 1 })); + controller.close(); + }, + }); + const stream = server.toStream(); + const output = new WritableStream({ + start() { + this.chunks = []; + }, + write(chunk) { + this.chunks.push(chunk); + }, + close() { + expect(this.chunks).toEqual([JSON.stringify({ jsonrpc: '2.0', result: 'pong', id: 1 })]) + }, + }); + + expect.assertions(1); + + await input.pipeThrough(stream).pipeTo(output); + }); });