diff --git a/spec/integ/sliding-sync.spec.ts b/spec/integ/sliding-sync.spec.ts index 9cf6ff2e9cd..8c4a7ad1254 100644 --- a/spec/integ/sliding-sync.spec.ts +++ b/spec/integ/sliding-sync.spec.ts @@ -562,6 +562,225 @@ describe("SlidingSync", () => { }); }); + describe("transaction IDs", () => { + beforeAll(setupClient); + afterAll(teardownClient); + const roomId = "!foo:bar"; + + let slidingSync: SlidingSync; + + // really this applies to them all but it's easier to just test one + it("should resolve modifyRoomSubscriptions after SlidingSync.start() is called", async () => { + const roomSubInfo = { + timeline_limit: 1, + required_state: [ + ["m.room.name", ""], + ], + }; + // add the subscription + slidingSync = new SlidingSync(proxyBaseUrl, [], roomSubInfo, client, 1); + // modification before SlidingSync.start() + const subscribePromise = slidingSync.modifyRoomSubscriptions(new Set([roomId])); + let txnId; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.debug("got ", body); + expect(body.room_subscriptions).toBeTruthy(); + expect(body.room_subscriptions[roomId]).toEqual(roomSubInfo); + expect(body.txn_id).toBeTruthy(); + txnId = body.txn_id; + }).respond(200, function() { + return { + pos: "aaa", + txn_id: txnId, + lists: [], + extensions: {}, + rooms: { + [roomId]: { + name: "foo bar", + required_state: [], + timeline: [], + }, + }, + }; + }); + slidingSync.start(); + await httpBackend.flushAllExpected(); + await subscribePromise; + }); + it("should resolve setList during a connection", async () => { + const newList = { + ranges: [[0, 20]], + }; + const promise = slidingSync.setList(0, newList); + let txnId; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.debug("got ", body); + expect(body.room_subscriptions).toBeFalsy(); + expect(body.lists[0]).toEqual(newList); + expect(body.txn_id).toBeTruthy(); + txnId = body.txn_id; + }).respond(200, function() { + return { + pos: "bbb", + txn_id: txnId, + lists: [{ count: 5 }], + extensions: {}, + }; + }); + await httpBackend.flushAllExpected(); + await promise; + expect(txnId).toBeDefined(); + }); + it("should resolve setListRanges during a connection", async () => { + const promise = slidingSync.setListRanges(0, [[20, 40]]); + let txnId; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.debug("got ", body); + expect(body.room_subscriptions).toBeFalsy(); + expect(body.lists[0]).toEqual({ + ranges: [[20, 40]], + }); + expect(body.txn_id).toBeTruthy(); + txnId = body.txn_id; + }).respond(200, function() { + return { + pos: "ccc", + txn_id: txnId, + lists: [{ count: 5 }], + extensions: {}, + }; + }); + await httpBackend.flushAllExpected(); + await promise; + expect(txnId).toBeDefined(); + }); + it("should resolve modifyRoomSubscriptionInfo during a connection", async () => { + const promise = slidingSync.modifyRoomSubscriptionInfo({ + timeline_limit: 99, + }); + let txnId; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.debug("got ", body); + expect(body.room_subscriptions).toBeTruthy(); + expect(body.room_subscriptions[roomId]).toEqual({ + timeline_limit: 99, + }); + expect(body.txn_id).toBeTruthy(); + txnId = body.txn_id; + }).respond(200, function() { + return { + pos: "ddd", + txn_id: txnId, + extensions: {}, + }; + }); + await httpBackend.flushAllExpected(); + await promise; + expect(txnId).toBeDefined(); + }); + it("should reject earlier pending promises if a later transaction is acknowledged", async () => { + // i.e if we have [A,B,C] and see txn_id=C then A,B should be rejected. + const gotTxnIds = []; + const pushTxn = function(req) { + gotTxnIds.push(req.data.txn_id); + }; + const failPromise = slidingSync.setListRanges(0, [[20, 40]]); + httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "e" }); // missing txn_id + await httpBackend.flushAllExpected(); + const failPromise2 = slidingSync.setListRanges(0, [[60, 70]]); + httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "f" }); // missing txn_id + await httpBackend.flushAllExpected(); + + // attach rejection handlers now else if we do it later Jest treats that as an unhandled rejection + // which is a fail. + expect(failPromise).rejects.toEqual(gotTxnIds[0]); + expect(failPromise2).rejects.toEqual(gotTxnIds[1]); + + const okPromise = slidingSync.setListRanges(0, [[0, 20]]); + let txnId; + httpBackend.when("POST", syncUrl).check((req) => { + txnId = req.data.txn_id; + }).respond(200, () => { + // include the txn_id, earlier requests should now be reject()ed. + return { + pos: "g", + txn_id: txnId, + }; + }); + await httpBackend.flushAllExpected(); + await okPromise; + + expect(txnId).toBeDefined(); + }); + it("should not reject later pending promises if an earlier transaction is acknowledged", async () => { + // i.e if we have [A,B,C] and see txn_id=B then C should not be rejected but A should. + const gotTxnIds = []; + const pushTxn = function(req) { + gotTxnIds.push(req.data.txn_id); + }; + const A = slidingSync.setListRanges(0, [[20, 40]]); + httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "A" }); + await httpBackend.flushAllExpected(); + const B = slidingSync.setListRanges(0, [[60, 70]]); + httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "B" }); // missing txn_id + await httpBackend.flushAllExpected(); + + // attach rejection handlers now else if we do it later Jest treats that as an unhandled rejection + // which is a fail. + expect(A).rejects.toEqual(gotTxnIds[0]); + + const C = slidingSync.setListRanges(0, [[0, 20]]); + let pendingC = true; + C.finally(() => { + pendingC = false; + }); + httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, () => { + // include the txn_id for B, so C's promise is outstanding + return { + pos: "C", + txn_id: gotTxnIds[1], + }; + }); + await httpBackend.flushAllExpected(); + // A is rejected, see above + expect(B).resolves.toEqual(gotTxnIds[1]); // B is resolved + expect(pendingC).toBe(true); // C is pending still + }); + it("should do nothing for unknown txn_ids", async () => { + const promise = slidingSync.setListRanges(0, [[20, 40]]); + let pending = true; + promise.finally(() => { + pending = false; + }); + let txnId; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.debug("got ", body); + expect(body.room_subscriptions).toBeFalsy(); + expect(body.lists[0]).toEqual({ + ranges: [[20, 40]], + }); + expect(body.txn_id).toBeTruthy(); + txnId = body.txn_id; + }).respond(200, function() { + return { + pos: "ccc", + txn_id: "bogus transaction id", + lists: [{ count: 5 }], + extensions: {}, + }; + }); + await httpBackend.flushAllExpected(); + expect(txnId).toBeDefined(); + expect(pending).toBe(true); + slidingSync.stop(); + }); + }); + describe("extensions", () => { beforeAll(setupClient); afterAll(teardownClient); diff --git a/src/sliding-sync.ts b/src/sliding-sync.ts index 5254a077dcf..28026b3a999 100644 --- a/src/sliding-sync.ts +++ b/src/sliding-sync.ts @@ -19,7 +19,7 @@ import { IAbortablePromise } from "./@types/partials"; import { MatrixClient } from "./client"; import { IRoomEvent, IStateEvent } from "./sync-accumulator"; import { TypedEventEmitter } from "./models//typed-event-emitter"; -import { sleep } from "./utils"; +import { sleep, IDeferred, defer } from "./utils"; // /sync requests allow you to set a timeout= but the request may continue // beyond that and wedge forever, so we need to track how long we are willing @@ -68,6 +68,7 @@ export interface MSC3575SlidingSyncRequest { unsubscribe_rooms?: string[]; room_subscriptions?: Record; extensions?: object; + txn_id?: string; // query params pos?: string; @@ -126,6 +127,7 @@ type Operation = DeleteOperation | InsertOperation | InvalidateOperation | SyncO */ export interface MSC3575SlidingSyncResponse { pos: string; + txn_id?: string; lists: ListResponse[]; rooms: Record; extensions: object; @@ -334,6 +336,11 @@ export class SlidingSync extends TypedEventEmitter & { txnId: string})[] = []; // map of extension name to req/resp handler private extensions: Record = {}; @@ -403,10 +410,13 @@ export class SlidingSync extends TypedEventEmitter { this.lists[index].updateListRange(ranges); - this.resend(); + return this.resend(); } /** @@ -414,15 +424,18 @@ export class SlidingSync extends TypedEventEmitter { if (this.lists[index]) { this.lists[index].replaceList(list); } else { this.lists[index] = new SlidingList(list); } this.listModifiedCount += 1; - this.resend(); + return this.resend(); } /** @@ -438,21 +451,27 @@ export class SlidingSync extends TypedEventEmitter) { + public modifyRoomSubscriptions(s: Set): Promise { this.desiredRoomSubscriptions = s; - this.resend(); + return this.resend(); } /** * Modify which events to retrieve for room subscriptions. Invalidates all room subscriptions * such that they will be sent up afresh. * @param rs The new room subscription fields to fetch. + * @return A promise which resolves to the transaction ID when it has been received down sync + * (or rejects with the transaction ID if the action was not applied e.g the request was cancelled + * immediately after sending, in which case the action will be applied in the subsequent request) */ - public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): void { + public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): Promise { this.roomSubscriptionInfo = rs; this.confirmedRoomSubscriptions = new Set(); - this.resend(); + return this.resend(); } /** @@ -615,11 +634,52 @@ export class SlidingSync extends TypedEventEmitter { + if (this.needsResend && this.txnIdDefers.length > 0) { + // we already have a resend queued, so just return the same promise + return this.txnIdDefers[this.txnIdDefers.length-1].promise; + } this.needsResend = true; + this.txnId = this.client.makeTxnId(); + const d = defer(); + this.txnIdDefers.push({ + ...d, + txnId: this.txnId, + }); this.pendingReq?.abort(); + return d.promise; + } + + private resolveTransactionDefers(txnId?: string) { + if (!txnId) { + return; + } + // find the matching index + let txnIndex = -1; + for (let i = 0; i < this.txnIdDefers.length; i++) { + if (this.txnIdDefers[i].txnId === txnId) { + txnIndex = i; + break; + } + } + if (txnIndex === -1) { + // this shouldn't happen; we shouldn't be seeing txn_ids for things we don't know about, + // whine about it. + logger.warn(`resolveTransactionDefers: seen ${txnId} but it isn't a pending txn, ignoring.`); + return; + } + // This list is sorted in time, so if the input txnId ACKs in the middle of this array, + // then everything before it that hasn't been ACKed yet never will and we should reject them. + for (let i = 0; i < txnIndex; i++) { + this.txnIdDefers[i].reject(this.txnIdDefers[i].txnId); + } + this.txnIdDefers[txnIndex].resolve(txnId); + // clear out settled promises, incuding the one we resolved. + this.txnIdDefers = this.txnIdDefers.slice(txnIndex+1); } /** @@ -666,6 +726,10 @@ export class SlidingSync extends TypedEventEmitter