diff --git a/lib/socket.ts b/lib/socket.ts index 1221445218..b436ec5ad2 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -137,7 +137,7 @@ export class Socket< private readonly adapter: Adapter; private acks: Map void> = new Map(); private fns: Array<(event: Event, next: (err?: Error) => void) => void> = []; - private flags: BroadcastFlags = {}; + private flags: BroadcastFlags & { timeout?: number } = {}; private _anyListeners?: Array<(...args: any[]) => void>; /** @@ -207,9 +207,11 @@ export class Socket< // access last argument to see if it's an ACK callback if (typeof data[data.length - 1] === "function") { - debug("emitting packet with ack id %d", this.nsp._ids); - this.acks.set(this.nsp._ids, data.pop()); - packet.id = this.nsp._ids++; + const id = this.nsp._ids++; + debug("emitting packet with ack id %d", id); + + this.registerAckCallback(id, data.pop()); + packet.id = id; } const flags = Object.assign({}, this.flags); @@ -220,6 +222,27 @@ export class Socket< return true; } + /** + * @private + */ + private registerAckCallback(id: number, ack: (...args: any[]) => void): void { + const timeout = this.flags.timeout; + if (timeout === undefined) { + this.acks.set(id, ack); + return; + } + + const timer = setTimeout(() => { + debug("event with ack id %d has timed out after %d ms", id, timeout); + ack.call(this, new Error("operation has timed out")); + }, timeout); + + this.acks.set(id, (...args) => { + clearTimeout(timer); + ack.apply(this, [null, ...args]); + }); + } + /** * Targets a room when broadcasting. * @@ -567,6 +590,26 @@ export class Socket< return this.newBroadcastOperator().local; } + /** + * Sets a modifier for a subsequent event emission that the callback will be called with an error when the + * given number of milliseconds have elapsed without an acknowledgement from the client: + * + * ``` + * socket.timeout(5000).emit("my-event", (err) => { + * if (err) { + * // the client did not acknowledge the event in the given delay + * } + * }); + * ``` + * + * @returns self + * @public + */ + public timeout(timeout: number): this { + this.flags.timeout = timeout; + return this; + } + /** * Dispatch incoming event to socket listeners. * diff --git a/test/socket-timeout.ts b/test/socket-timeout.ts new file mode 100644 index 0000000000..7cc8d6c5dc --- /dev/null +++ b/test/socket-timeout.ts @@ -0,0 +1,34 @@ +import { Server } from ".."; +import { createClient, success } from "./support/util"; +import expect from "expect.js"; + +describe("timeout", () => { + it("should timeout if the client does not acknowledge the event", (done) => { + const io = new Server(0); + const client = createClient(io, "/"); + + io.on("connection", (socket) => { + socket.timeout(50).emit("unknown", (err) => { + expect(err).to.be.an(Error); + success(done, io, client); + }); + }); + }); + + it("should not timeout if the client does acknowledge the event", (done) => { + const io = new Server(0); + const client = createClient(io, "/"); + + client.on("echo", (arg, cb) => { + cb(arg); + }); + + io.on("connection", (socket) => { + socket.timeout(50).emit("echo", 42, (err, value) => { + expect(err).to.be(null); + expect(value).to.be(42); + success(done, io, client); + }); + }); + }); +}); diff --git a/test/socket.io.ts b/test/socket.io.ts index d6255e815c..4fa83ac59d 100644 --- a/test/socket.io.ts +++ b/test/socket.io.ts @@ -2888,4 +2888,6 @@ describe("socket.io", () => { }); }); }); + + require("./socket-timeout"); }); diff --git a/test/support/util.ts b/test/support/util.ts index 1cfb3c3e2c..5de5cf825a 100644 --- a/test/support/util.ts +++ b/test/support/util.ts @@ -1,3 +1,11 @@ +import type { Server } from "../.."; +import { + io as ioc, + ManagerOptions, + Socket as ClientSocket, + SocketOptions, +} from "socket.io-client"; + const expect = require("expect.js"); const i = expect.stringify; @@ -20,3 +28,19 @@ expect.Assertion.prototype.contain = function (...args) { } return contain.apply(this, args); }; + +export function createClient( + io: Server, + nsp: string, + opts?: ManagerOptions & SocketOptions +): ClientSocket { + // @ts-ignore + const port = io.httpServer.address().port; + return ioc(`http://localhost:${port}${nsp}`, opts); +} + +export function success(done: Function, io: Server, client: ClientSocket) { + io.close(); + client.disconnect(); + done(); +}