diff --git a/packages/connection/__test__/browser/index.test.ts b/packages/connection/__test__/browser/index.test.ts index 59953b1558..b5e29d271d 100644 --- a/packages/connection/__test__/browser/index.test.ts +++ b/packages/connection/__test__/browser/index.test.ts @@ -1,4 +1,4 @@ -import { furySerializer } from '@opensumi/ide-connection'; +import { WSWebSocketConnection, furySerializer } from '@opensumi/ide-connection'; import { ReconnectingWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection/drivers/reconnecting-websocket'; import { sleep } from '@opensumi/ide-core-common'; import { Server, WebSocket } from '@opensumi/mock-socket'; @@ -21,10 +21,11 @@ describe('connection browser', () => { let data2Received = false; mockServer.on('connection', (socket) => { - socket.on('message', (msg) => { + const connection = new WSWebSocketConnection(socket as any); + connection.onMessage((msg) => { const msgObj = furySerializer.deserialize(msg as Uint8Array); if (msgObj.kind === 'open') { - socket.send( + connection.send( furySerializer.serialize({ id: msgObj.id, kind: 'server-ready', diff --git a/packages/connection/__test__/common/frame-decoder.test.ts b/packages/connection/__test__/common/frame-decoder.test.ts index 7054522028..692f11b44d 100644 --- a/packages/connection/__test__/common/frame-decoder.test.ts +++ b/packages/connection/__test__/common/frame-decoder.test.ts @@ -35,7 +35,9 @@ console.timeEnd('createPayload'); // 1m const pressure = 1024 * 1024; -const purePackets = [p1k, p64k, p128k, p5m, p10m].map((v) => [LengthFieldBasedFrameDecoder.construct(v), v] as const); +const purePackets = [p1k, p64k, p128k, p5m, p10m].map( + (v) => [LengthFieldBasedFrameDecoder.construct(v).dump(), v] as const, +); const size = purePackets.reduce((acc, v) => acc + v[0].byteLength, 0); @@ -48,7 +50,7 @@ purePackets.forEach((v) => { }); const mixedPackets = [p1m, p5m].map((v) => { - const sumiPacket = LengthFieldBasedFrameDecoder.construct(v); + const sumiPacket = LengthFieldBasedFrameDecoder.construct(v).dump(); const newPacket = createPayload(1024 + sumiPacket.byteLength); newPacket.set(sumiPacket, 1024); return [newPacket, v] as const; @@ -59,7 +61,7 @@ const packets = [...purePackets, ...mixedPackets]; describe('frame decoder', () => { it('can create frame', () => { const content = new Uint8Array([1, 2, 3]); - const packet = LengthFieldBasedFrameDecoder.construct(content); + const packet = LengthFieldBasedFrameDecoder.construct(content).dump(); const reader = BinaryReader({}); reader.reset(packet); @@ -116,7 +118,7 @@ describe('frame decoder', () => { it('can decode a stream it has no valid length info', (done) => { const v = createPayload(1024); - const sumiPacket = LengthFieldBasedFrameDecoder.construct(v); + const sumiPacket = LengthFieldBasedFrameDecoder.construct(v).dump(); const decoder = new LengthFieldBasedFrameDecoder(); decoder.onData((data) => { diff --git a/packages/connection/src/common/buffers/buffers.ts b/packages/connection/src/common/buffers/buffers.ts index 583056910b..e3abacbcb9 100644 --- a/packages/connection/src/common/buffers/buffers.ts +++ b/packages/connection/src/common/buffers/buffers.ts @@ -5,6 +5,7 @@ */ export const emptyBuffer = new Uint8Array(0); +export const buffer4Capacity = new Uint8Array(4); export function copy( source: Uint8Array, @@ -72,6 +73,39 @@ export class Buffers { return target; } + slice4(start: number) { + let end = start + 4; + const buffers = this.buffers; + + if (end > this.size) { + end = this.size; + } + + if (start >= end) { + return emptyBuffer; + } + + let startBytes = 0; + let si = 0; + for (; si < buffers.length && startBytes + buffers[si].length <= start; si++) { + startBytes += buffers[si].length; + } + + const target = buffer4Capacity; + + let ti = 0; + for (let ii = si; ti < end - start && ii < buffers.length; ii++) { + const len = buffers[ii].length; + + const _start = ti === 0 ? start - startBytes : 0; + const _end = ti + len >= end - start ? Math.min(_start + (end - start) - ti, len) : len; + copy(buffers[ii], target, ti, _start, _end); + ti += _end - _start; + } + + return target; + } + pos(i: number): { buf: number; offset: number } { if (i < 0 || i >= this.size) { throw new Error(`out of range, ${i} not in [0, ${this.size})`); @@ -268,6 +302,12 @@ export class Cursor { return buffers; } + read4() { + const buffers = this.buffers.slice4(this.offset); + this.skip(4); + return buffers; + } + skip(n: number) { let count = 0; while (this.chunkIndex < this.buffers.buffers.length) { diff --git a/packages/connection/src/common/connection/drivers/frame-decoder.ts b/packages/connection/src/common/connection/drivers/frame-decoder.ts index eb5ef07b43..32005ee15e 100644 --- a/packages/connection/src/common/connection/drivers/frame-decoder.ts +++ b/packages/connection/src/common/connection/drivers/frame-decoder.ts @@ -1,21 +1,37 @@ import { BinaryWriter } from '@furyjs/fury/dist/lib/writer'; -import { Emitter, readUInt32LE } from '@opensumi/ide-core-common'; +import { MaybeNull, readUInt32LE } from '@opensumi/ide-core-common'; import { Buffers } from '../../buffers/buffers'; /** * You can use `Buffer.from('\r\n\r\n')` to get this indicator. */ -export const indicator = new Uint8Array([0x0d, 0x0a, 0x0d, 0x0a]); +export const indicator = new Uint8Array([0x0d, 0x0a, 0x0a, 0x0d]); + +/** + * The number of bytes in the length field. + * + * How many bytes are used to represent data length. + * + * For example, if the length field is 4 bytes, then the maximum length of the data is 2^32 = 4GB + */ +const lengthFieldLength = 4; /** * sticky packet unpacking problems are generally problems at the transport layer. * we use a length field to represent the length of the data, and then read the data according to the length */ export class LengthFieldBasedFrameDecoder { - protected dataEmitter = new Emitter(); - onData = this.dataEmitter.event; + private _onDataListener: MaybeNull<(data: Uint8Array) => void>; + onData(listener: (data: Uint8Array) => void) { + this._onDataListener = listener; + return { + dispose: () => { + this._onDataListener = null; + }, + }; + } protected buffers = new Buffers(); protected cursor = this.buffers.cursor(); @@ -24,15 +40,6 @@ export class LengthFieldBasedFrameDecoder { protected state = 0; - /** - * The number of bytes in the length field. - * - * How many bytes are used to represent data length. - * - * For example, if the length field is 4 bytes, then the maximum length of the data is 2^32 = 4GB - */ - lengthFieldLength = 4; - reset() { this.contentLength = -1; this.state = 0; @@ -57,7 +64,9 @@ export class LengthFieldBasedFrameDecoder { const binary = this.buffers.slice(start, end); - this.dataEmitter.fire(binary); + if (this._onDataListener) { + this._onDataListener(binary); + } if (this.buffers.byteLength > end) { this.contentLength = -1; @@ -93,13 +102,13 @@ export class LengthFieldBasedFrameDecoder { } if (this.contentLength === -1) { - if (this.cursor.offset + this.lengthFieldLength > bufferLength) { + if (this.cursor.offset + lengthFieldLength > bufferLength) { // Not enough data yet, wait for more data return false; } // read the content length - const buf = this.cursor.read(this.lengthFieldLength); + const buf = this.cursor.read4(); // fury writer use little endian this.contentLength = readUInt32LE(buf, 0); } @@ -123,8 +132,8 @@ export class LengthFieldBasedFrameDecoder { case 0: this.state = 1; break; - case 2: - this.state = 3; + case 3: + this.state = 4; break; default: this.state = 0; @@ -136,8 +145,8 @@ export class LengthFieldBasedFrameDecoder { case 1: this.state = 2; break; - case 3: - this.state = 4; + case 2: + this.state = 3; iter.return(); break; default: @@ -154,7 +163,7 @@ export class LengthFieldBasedFrameDecoder { } dispose() { - this.dataEmitter.dispose(); + this._onDataListener = undefined; this.buffers.dispose(); } @@ -165,6 +174,6 @@ export class LengthFieldBasedFrameDecoder { LengthFieldBasedFrameDecoder.writer.buffer(indicator); LengthFieldBasedFrameDecoder.writer.uint32(content.byteLength); LengthFieldBasedFrameDecoder.writer.buffer(content); - return LengthFieldBasedFrameDecoder.writer.dump(); + return LengthFieldBasedFrameDecoder.writer; } } diff --git a/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts b/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts index 4b4694da79..dd889ee689 100644 --- a/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts +++ b/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts @@ -4,17 +4,34 @@ import ReconnectingWebSocket, { UrlProvider, } from '@opensumi/reconnecting-websocket'; +import { chunkSize } from '../../constants'; + import { BaseConnection } from './base'; +import { LengthFieldBasedFrameDecoder } from './frame-decoder'; import type { ErrorEvent } from '@opensumi/reconnecting-websocket'; export class ReconnectingWebSocketConnection extends BaseConnection { - constructor(private socket: ReconnectingWebSocket) { + protected decoder = new LengthFieldBasedFrameDecoder(); + + protected constructor(private socket: ReconnectingWebSocket) { super(); + + if (socket.binaryType === 'arraybuffer') { + this.socket.addEventListener('message', this.arrayBufferHandler); + } else if (socket.binaryType === 'blob') { + throw new Error('blob is not implemented'); + } } send(data: Uint8Array): void { - this.socket.send(data); + const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); + const packet = handle.get(); + for (let i = 0; i < packet.byteLength; i += chunkSize) { + this.socket.send(packet.subarray(i, i + chunkSize)); + } + + handle.dispose(); } isOpen(): boolean { @@ -29,29 +46,8 @@ export class ReconnectingWebSocketConnection extends BaseConnection }, }; } - onMessage(cb: (data: Uint8Array) => void): IDisposable { - const handler = (e: MessageEvent) => { - let buffer: Promise; - if (e.data instanceof Blob) { - buffer = e.data.arrayBuffer(); - } else if (e.data instanceof ArrayBuffer) { - buffer = Promise.resolve(e.data); - } else if (e.data?.constructor?.name === 'Buffer') { - // Compatibility with nodejs Buffer in test environment - buffer = Promise.resolve(e.data); - } else { - throw new Error('unknown message type, expect Blob or ArrayBuffer, received: ' + typeof e.data); - } - buffer.then((v) => cb(new Uint8Array(v, 0, v.byteLength))); - }; - - this.socket.addEventListener('message', handler); - return { - dispose: () => { - this.socket.removeEventListener('message', handler); - }, - }; + return this.decoder.onData(cb); } onceClose(cb: (code?: number, reason?: string) => void): IDisposable { const disposable = this.onClose(wrapper); @@ -91,8 +87,13 @@ export class ReconnectingWebSocketConnection extends BaseConnection }; } + private arrayBufferHandler = (e: MessageEvent) => { + const buffer: ArrayBuffer = e.data; + this.decoder.push(new Uint8Array(buffer, 0, buffer.byteLength)); + }; + dispose(): void { - // do nothing + this.socket.removeEventListener('message', this.arrayBufferHandler); } static forURL(url: UrlProvider, protocols?: string | string[], options?: ReconnectingWebSocketOptions) { diff --git a/packages/connection/src/common/connection/drivers/stream.ts b/packages/connection/src/common/connection/drivers/stream.ts index 5639ec3d02..6a8603183c 100644 --- a/packages/connection/src/common/connection/drivers/stream.ts +++ b/packages/connection/src/common/connection/drivers/stream.ts @@ -21,10 +21,11 @@ export class StreamConnection extends BaseConnection { } send(data: Uint8Array): void { - const result = LengthFieldBasedFrameDecoder.construct(data); - this.writable.write(result, () => { + const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); + this.writable.write(handle.get(), () => { // TODO: logger error }); + handle.dispose(); } onMessage(cb: (data: Uint8Array) => void): IDisposable { diff --git a/packages/connection/src/common/connection/drivers/ws-websocket.ts b/packages/connection/src/common/connection/drivers/ws-websocket.ts index d353fbfb07..83b1d591ef 100644 --- a/packages/connection/src/common/connection/drivers/ws-websocket.ts +++ b/packages/connection/src/common/connection/drivers/ws-websocket.ts @@ -1,24 +1,34 @@ import { IDisposable } from '@opensumi/ide-core-common'; +import { chunkSize } from '../../constants'; + import { BaseConnection } from './base'; +import { LengthFieldBasedFrameDecoder } from './frame-decoder'; import type WebSocket from 'ws'; export class WSWebSocketConnection extends BaseConnection { + protected decoder = new LengthFieldBasedFrameDecoder(); + constructor(public socket: WebSocket) { super(); + this.socket.on('message', (data: Buffer) => { + this.decoder.push(data); + }); } + send(data: Uint8Array): void { - this.socket.send(data); + const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); + const packet = handle.get(); + for (let i = 0; i < packet.byteLength; i += chunkSize) { + this.socket.send(packet.subarray(i, i + chunkSize)); + } + + handle.dispose(); } onMessage(cb: (data: Uint8Array) => void): IDisposable { - this.socket.on('message', cb); - return { - dispose: () => { - this.socket.off('message', cb); - }, - }; + return this.decoder.onData(cb); } onceClose(cb: () => void): IDisposable { this.socket.once('close', cb); diff --git a/packages/connection/src/common/constants.ts b/packages/connection/src/common/constants.ts index c98cfccb5b..e8574a281e 100644 --- a/packages/connection/src/common/constants.ts +++ b/packages/connection/src/common/constants.ts @@ -1 +1,6 @@ export const METHOD_NOT_REGISTERED = '$$METHOD_NOT_REGISTERED'; + +/** + * 分片大小, 8MB + */ +export const chunkSize = 8 * 1024 * 1024; diff --git a/packages/connection/src/node/common-channel-handler.ts b/packages/connection/src/node/common-channel-handler.ts index 156613f256..076432dcce 100644 --- a/packages/connection/src/node/common-channel-handler.ts +++ b/packages/connection/src/node/common-channel-handler.ts @@ -42,8 +42,7 @@ export class CommonChannelHandler extends BaseCommonChannelHandler implements We ...this.options.wsServerOptions, }); this.wsServer.on('connection', (connection: WebSocket) => { - const wsConnection = new WSWebSocketConnection(connection); - this.receiveConnection(wsConnection); + this.receiveConnection(new WSWebSocketConnection(connection)); }); } diff --git a/packages/core-browser/__tests__/bootstrap/connection.test.ts b/packages/core-browser/__tests__/bootstrap/connection.test.ts index b87b9da2ac..6bf8ee0be7 100644 --- a/packages/core-browser/__tests__/bootstrap/connection.test.ts +++ b/packages/core-browser/__tests__/bootstrap/connection.test.ts @@ -1,6 +1,6 @@ import { WSChannelHandler } from '@opensumi/ide-connection/lib/browser'; import { ReconnectingWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection/drivers/reconnecting-websocket'; -import { BrowserConnectionErrorEvent, IEventBus } from '@opensumi/ide-core-common'; +import { BrowserConnectionErrorEvent, IEventBus, sleep } from '@opensumi/ide-core-common'; import { createBrowserInjector } from '@opensumi/ide-dev-tool/src/injector-helper'; import { MockInjector } from '@opensumi/ide-dev-tool/src/mock-injector'; import { Server, WebSocket } from '@opensumi/mock-socket'; @@ -34,11 +34,8 @@ describe('packages/core-browser/src/bootstrap/connection.test.ts', () => { const channelHandler = new WSChannelHandler(ReconnectingWebSocketConnection.forURL(fakeWSURL), 'test-client-id'); createConnectionService(injector, [], channelHandler); stateService.state = 'core_module_initialized'; - new Promise((resolve) => { - setTimeout(() => { - resolve(); - }, 4000); - }).then(() => { + + sleep(4000).then(() => { mockServer.simulate('error'); }); });