Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: feat: send data by chunk in websocket #3988

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions packages/connection/__test__/common/frame-decoder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ console.timeEnd('createPayload');
// 1m
const pressure = 1024 * 1024;

const purePackets = [p1k, p64k, p128k, p5m, p10m].map((v) => [LengthFieldBasedFrameDecoder.construct(v), v] as const);
const purePackets = [p1k, p64k, p128k, p5m, p10m].map(
(v) => [LengthFieldBasedFrameDecoder.construct(v).dump(), v] as const,
);

const size = purePackets.reduce((acc, v) => acc + v[0].byteLength, 0);

Expand All @@ -48,7 +50,7 @@ purePackets.forEach((v) => {
});

const mixedPackets = [p1m, p5m].map((v) => {
const sumiPacket = LengthFieldBasedFrameDecoder.construct(v);
const sumiPacket = LengthFieldBasedFrameDecoder.construct(v).dump();
const newPacket = createPayload(1024 + sumiPacket.byteLength);
newPacket.set(sumiPacket, 1024);
return [newPacket, v] as const;
Expand All @@ -59,7 +61,7 @@ const packets = [...purePackets, ...mixedPackets];
describe('frame decoder', () => {
it('can create frame', () => {
const content = new Uint8Array([1, 2, 3]);
const packet = LengthFieldBasedFrameDecoder.construct(content);
const packet = LengthFieldBasedFrameDecoder.construct(content).dump();
const reader = BinaryReader({});

reader.reset(packet);
Expand Down Expand Up @@ -116,7 +118,7 @@ describe('frame decoder', () => {

it('can decode a stream it has no valid length info', (done) => {
const v = createPayload(1024);
const sumiPacket = LengthFieldBasedFrameDecoder.construct(v);
const sumiPacket = LengthFieldBasedFrameDecoder.construct(v).dump();

const decoder = new LengthFieldBasedFrameDecoder();
decoder.onData((data) => {
Expand Down
21 changes: 15 additions & 6 deletions packages/connection/src/common/connection/drivers/frame-decoder.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BinaryWriter } from '@furyjs/fury/dist/lib/writer';

import { Emitter, readUInt32LE } from '@opensumi/ide-core-common';
import { MaybeNull, readUInt32LE } from '@opensumi/ide-core-common';

import { Buffers } from '../../buffers/buffers';

Expand All @@ -14,8 +14,15 @@ export const indicator = new Uint8Array([0x0d, 0x0a, 0x0d, 0x0a]);
* we use a length field to represent the length of the data, and then read the data according to the length
*/
export class LengthFieldBasedFrameDecoder {
protected dataEmitter = new Emitter<Uint8Array>();
onData = this.dataEmitter.event;
private _onDataListener: MaybeNull<(data: Uint8Array) => void>;
onData(listener: (data: Uint8Array) => void) {
this._onDataListener = listener;
return {
dispose: () => {
this._onDataListener = null;
},
};
}
Comment on lines +26 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

监听器管理方式的重大变更

这个变更将之前基于 Emitter 的多监听器方法改为了单一监听器方法。虽然这简化了事件处理机制,但也限制了类只能同时处理一个监听器。

考虑以下几点:

  1. 这种改变可能会影响依赖多个监听器的现有代码。
  2. 单一监听器模式可能会在某些使用场景下造成限制。

建议考虑以下改进:

  1. 如果确实需要多个监听器,可以考虑使用数组来存储多个监听器函数。
  2. 添加清晰的文档注释,说明这个类现在只支持单一监听器,以防止误用。
  3. 考虑添加一个 removeListener 方法,使 API 更加完整和直观。
private _onDataListeners: Array<(data: Uint8Array) => void> = [];

onData(listener: (data: Uint8Array) => void) {
  this._onDataListeners.push(listener);
  return {
    dispose: () => {
      const index = this._onDataListeners.indexOf(listener);
      if (index > -1) {
        this._onDataListeners.splice(index, 1);
      }
    },
  };
}

removeListener(listener: (data: Uint8Array) => void) {
  const index = this._onDataListeners.indexOf(listener);
  if (index > -1) {
    this._onDataListeners.splice(index, 1);
  }
}

这样的实现既保持了简单性,又提供了更大的灵活性。


protected buffers = new Buffers();
protected cursor = this.buffers.cursor();
Expand Down Expand Up @@ -57,7 +64,9 @@ export class LengthFieldBasedFrameDecoder {

const binary = this.buffers.slice(start, end);

this.dataEmitter.fire(binary);
if (this._onDataListener) {
this._onDataListener(binary);
}

if (this.buffers.byteLength > end) {
this.contentLength = -1;
Expand Down Expand Up @@ -154,7 +163,7 @@ export class LengthFieldBasedFrameDecoder {
}

dispose() {
this.dataEmitter.dispose();
this._onDataListener = undefined;
this.buffers.dispose();
}

Expand All @@ -165,6 +174,6 @@ export class LengthFieldBasedFrameDecoder {
LengthFieldBasedFrameDecoder.writer.buffer(indicator);
LengthFieldBasedFrameDecoder.writer.uint32(content.byteLength);
LengthFieldBasedFrameDecoder.writer.buffer(content);
return LengthFieldBasedFrameDecoder.writer.dump();
return LengthFieldBasedFrameDecoder.writer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,30 @@ import ReconnectingWebSocket, {
UrlProvider,
} from '@opensumi/reconnecting-websocket';

import { chunkSize } from '../../constants';

import { BaseConnection } from './base';
import { LengthFieldBasedFrameDecoder } from './frame-decoder';

import type { ErrorEvent } from '@opensumi/reconnecting-websocket';

export class ReconnectingWebSocketConnection extends BaseConnection<Uint8Array> {
protected decoder = new LengthFieldBasedFrameDecoder();

constructor(private socket: ReconnectingWebSocket) {
super();

this.socket.addEventListener('message', this.dataHandler);
}

send(data: Uint8Array): void {
this.socket.send(data);
const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
const packet = handle.get();
for (let i = 0; i < packet.byteLength; i += chunkSize) {
this.socket.send(packet.subarray(i, i + chunkSize));
}

handle.dispose();
}

isOpen(): boolean {
Expand All @@ -31,27 +44,7 @@ export class ReconnectingWebSocketConnection extends BaseConnection<Uint8Array>
}

onMessage(cb: (data: Uint8Array) => void): IDisposable {
const handler = (e: MessageEvent) => {
let buffer: Promise<ArrayBuffer>;
if (e.data instanceof Blob) {
buffer = e.data.arrayBuffer();
} else if (e.data instanceof ArrayBuffer) {
buffer = Promise.resolve(e.data);
} else if (e.data?.constructor?.name === 'Buffer') {
// Compatibility with nodejs Buffer in test environment
buffer = Promise.resolve(e.data);
} else {
throw new Error('unknown message type, expect Blob or ArrayBuffer, received: ' + typeof e.data);
}
buffer.then((v) => cb(new Uint8Array(v, 0, v.byteLength)));
};

this.socket.addEventListener('message', handler);
return {
dispose: () => {
this.socket.removeEventListener('message', handler);
},
};
return this.decoder.onData(cb);
}
onceClose(cb: (code?: number, reason?: string) => void): IDisposable {
const disposable = this.onClose(wrapper);
Expand Down Expand Up @@ -91,8 +84,23 @@ export class ReconnectingWebSocketConnection extends BaseConnection<Uint8Array>
};
}

private dataHandler = (e: MessageEvent) => {
let buffer: Promise<ArrayBuffer>;
if (e.data instanceof Blob) {
buffer = e.data.arrayBuffer();
} else if (e.data instanceof ArrayBuffer) {
buffer = Promise.resolve(e.data);
} else if (e.data?.constructor?.name === 'Buffer') {
// Compatibility with nodejs Buffer in test environment
buffer = Promise.resolve(e.data);
} else {
throw new Error('unknown message type, expect Blob or ArrayBuffer, received: ' + typeof e.data);
}
buffer.then((v) => this.decoder.push(new Uint8Array(v, 0, v.byteLength)));
};

dispose(): void {
// do nothing
this.socket.removeEventListener('message', this.dataHandler);
}

static forURL(url: UrlProvider, protocols?: string | string[], options?: ReconnectingWebSocketOptions) {
Expand Down
5 changes: 3 additions & 2 deletions packages/connection/src/common/connection/drivers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ export class StreamConnection extends BaseConnection<Uint8Array> {
}

send(data: Uint8Array): void {
const result = LengthFieldBasedFrameDecoder.construct(data);
this.writable.write(result, () => {
const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
this.writable.write(handle.get(), () => {
// TODO: logger error
});
handle.dispose();
}

onMessage(cb: (data: Uint8Array) => void): IDisposable {
Expand Down
24 changes: 17 additions & 7 deletions packages/connection/src/common/connection/drivers/ws-websocket.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,34 @@
import { IDisposable } from '@opensumi/ide-core-common';

import { chunkSize } from '../../constants';

import { BaseConnection } from './base';
import { LengthFieldBasedFrameDecoder } from './frame-decoder';

import type WebSocket from 'ws';

export class WSWebSocketConnection extends BaseConnection<Uint8Array> {
protected decoder = new LengthFieldBasedFrameDecoder();

constructor(public socket: WebSocket) {
super();
this.socket.on('message', (data: Buffer) => {
this.decoder.push(data);
});
bytemain marked this conversation as resolved.
Show resolved Hide resolved
}

send(data: Uint8Array): void {
this.socket.send(data);
const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
const packet = handle.get();
for (let i = 0; i < packet.byteLength; i += chunkSize) {
this.socket.send(packet.subarray(i, i + chunkSize));
}

handle.dispose();
Comment on lines +21 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send 方法的改进很好,但可以考虑进一步优化。

新的 send 方法使用 LengthFieldBasedFrameDecoder 构造数据包并分块发送,这是一个很好的改进,可以更好地处理大型消息。

为了进一步优化性能,您可以考虑以下建议:

  1. 使用 ArrayBufferSharedArrayBuffer 来减少内存复制。
  2. 考虑使用 WebSocket.bufferedAmount 来控制发送速率,避免缓冲区溢出。

示例实现:

send(data: Uint8Array): void {
  const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
  const packet = handle.get();
  
  const sendChunk = (start: number) => {
    while (start < packet.byteLength && this.socket.bufferedAmount < 1024 * 1024) { // 1MB buffer threshold
      const end = Math.min(start + chunkSize, packet.byteLength);
      this.socket.send(packet.subarray(start, end));
      start = end;
    }
    if (start < packet.byteLength) {
      setTimeout(() => sendChunk(start), 0);
    } else {
      handle.dispose();
    }
  };

  sendChunk(0);
}

这个实现使用了递归的方式来控制发送速率,避免一次性将所有数据推入缓冲区。

}

onMessage(cb: (data: Uint8Array) => void): IDisposable {
this.socket.on('message', cb);
return {
dispose: () => {
this.socket.off('message', cb);
},
};
return this.decoder.onData(cb);
}
onceClose(cb: () => void): IDisposable {
this.socket.once('close', cb);
Expand Down
5 changes: 5 additions & 0 deletions packages/connection/src/common/constants.ts
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
export const METHOD_NOT_REGISTERED = '$$METHOD_NOT_REGISTERED';

/**
* 分片大小, 8MB
*/
export const chunkSize = 8 * 1024 * 1024;
3 changes: 1 addition & 2 deletions packages/connection/src/node/common-channel-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ export class CommonChannelHandler extends BaseCommonChannelHandler implements We
...this.options.wsServerOptions,
});
this.wsServer.on('connection', (connection: WebSocket) => {
const wsConnection = new WSWebSocketConnection(connection);
this.receiveConnection(wsConnection);
this.receiveConnection(new WSWebSocketConnection(connection));
});
}

Expand Down
Loading