From e05f9b58150b506b810b1439cfa0cf58479600b3 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 5 Aug 2022 17:28:02 +0100 Subject: [PATCH 1/3] Add txn_id support to sliding sync This allows clients to know when a request has been applied on the server. This allows us to change `resend(): void` to `resend(): Promise` which resolves/rejects with the transaction ID when it has been applied/not. --- spec/integ/sliding-sync.spec.ts | 219 ++++++++++++++++++++++++++++++++ src/sliding-sync.ts | 74 +++++++++-- 2 files changed, 283 insertions(+), 10 deletions(-) diff --git a/spec/integ/sliding-sync.spec.ts b/spec/integ/sliding-sync.spec.ts index 9cf6ff2e9cd..cc7a1d0dd93 100644 --- a/spec/integ/sliding-sync.spec.ts +++ b/spec/integ/sliding-sync.spec.ts @@ -562,6 +562,225 @@ describe("SlidingSync", () => { }); }); + describe("transaction IDs", () => { + beforeAll(setupClient); + afterAll(teardownClient); + const roomId = "!foo:bar"; + + let slidingSync: SlidingSync; + + // really this applies to them all but it's easier to just test one + it("should resolve modifyRoomSubscriptions after SlidingSync.start() is called", async () => { + const roomSubInfo = { + timeline_limit: 1, + required_state: [ + ["m.room.name", ""], + ], + }; + // add the subscription + slidingSync = new SlidingSync(proxyBaseUrl, [], roomSubInfo, client, 1); + // modification before SlidingSync.start() + const subscribePromise = slidingSync.modifyRoomSubscriptions(new Set([roomId])); + let txnId; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("txn got ", body); + expect(body.room_subscriptions).toBeTruthy(); + expect(body.room_subscriptions[roomId]).toEqual(roomSubInfo); + expect(body.txn_id).toBeTruthy(); + txnId = body.txn_id; + }).respond(200, function() { + return { + pos: "aaa", + txn_id: txnId, + lists: [], + extensions: {}, + rooms: { + [roomId]: { + name: "foo bar", + required_state: [], + timeline: [], + }, + }, + }; + }); + slidingSync.start(); + await httpBackend.flushAllExpected(); + await subscribePromise; + }); + it("should resolve setList during a connection", async () => { + const newList = { + ranges: [[0, 20]], + }; + const promise = slidingSync.setList(0, newList); + let txnId; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("txn got ", body); + expect(body.room_subscriptions).toBeFalsy(); + expect(body.lists[0]).toEqual(newList); + expect(body.txn_id).toBeTruthy(); + txnId = body.txn_id; + }).respond(200, function() { + return { + pos: "bbb", + txn_id: txnId, + lists: [{ count: 5 }], + extensions: {}, + }; + }); + await httpBackend.flushAllExpected(); + await promise; + expect(txnId).toBeDefined(); + }); + it("should resolve setListRanges during a connection", async () => { + const promise = slidingSync.setListRanges(0, [[20, 40]]); + let txnId; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("txn got ", body); + expect(body.room_subscriptions).toBeFalsy(); + expect(body.lists[0]).toEqual({ + ranges: [[20, 40]], + }); + expect(body.txn_id).toBeTruthy(); + txnId = body.txn_id; + }).respond(200, function() { + return { + pos: "ccc", + txn_id: txnId, + lists: [{ count: 5 }], + extensions: {}, + }; + }); + await httpBackend.flushAllExpected(); + await promise; + expect(txnId).toBeDefined(); + }); + it("should resolve modifyRoomSubscriptionInfo during a connection", async () => { + const promise = slidingSync.modifyRoomSubscriptionInfo({ + timeline_limit: 99, + }); + let txnId; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("txn got ", body); + expect(body.room_subscriptions).toBeTruthy(); + expect(body.room_subscriptions[roomId]).toEqual({ + timeline_limit: 99, + }); + expect(body.txn_id).toBeTruthy(); + txnId = body.txn_id; + }).respond(200, function() { + return { + pos: "ddd", + txn_id: txnId, + extensions: {}, + }; + }); + await httpBackend.flushAllExpected(); + await promise; + expect(txnId).toBeDefined(); + }); + it("should reject earlier pending promises if a later transaction is acknowledged", async () => { + // i.e if we have [A,B,C] and see txn_id=C then A,B should be rejected. + const gotTxnIds = []; + const pushTxn = function(req) { + gotTxnIds.push(req.data.txn_id); + }; + const failPromise = slidingSync.setListRanges(0, [[20, 40]]); + httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "e" }); // missing txn_id + await httpBackend.flushAllExpected(); + const failPromise2 = slidingSync.setListRanges(0, [[60, 70]]); + httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "f" }); // missing txn_id + await httpBackend.flushAllExpected(); + + // attach rejection handlers now else if we do it later Jest treats that as an unhandled rejection + // which is a fail. + expect(failPromise).rejects.toEqual(gotTxnIds[0]); + expect(failPromise2).rejects.toEqual(gotTxnIds[1]); + + const okPromise = slidingSync.setListRanges(0, [[0, 20]]); + let txnId; + httpBackend.when("POST", syncUrl).check((req) => { + txnId = req.data.txn_id; + }).respond(200, () => { + // include the txn_id, earlier requests should now be reject()ed. + return { + pos: "g", + txn_id: txnId, + }; + }); + await httpBackend.flushAllExpected(); + await okPromise; + + expect(txnId).toBeDefined(); + }); + it("should not reject later pending promises if an earlier transaction is acknowledged", async () => { + // i.e if we have [A,B,C] and see txn_id=B then C should not be rejected but A should. + const gotTxnIds = []; + const pushTxn = function(req) { + gotTxnIds.push(req.data.txn_id); + }; + const A = slidingSync.setListRanges(0, [[20, 40]]); + httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "A" }); + await httpBackend.flushAllExpected(); + const B = slidingSync.setListRanges(0, [[60, 70]]); + httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "B" }); // missing txn_id + await httpBackend.flushAllExpected(); + + // attach rejection handlers now else if we do it later Jest treats that as an unhandled rejection + // which is a fail. + expect(A).rejects.toEqual(gotTxnIds[0]); + + const C = slidingSync.setListRanges(0, [[0, 20]]); + let pendingC = true; + C.finally(() => { + pendingC = false; + }); + httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, () => { + // include the txn_id for B, so C's promise is outstanding + return { + pos: "C", + txn_id: gotTxnIds[1], + }; + }); + await httpBackend.flushAllExpected(); + // A is rejected, see above + expect(B).resolves.toEqual(gotTxnIds[1]); // B is resolved + expect(pendingC).toBe(true); // C is pending still + }); + it("should do nothing for unknown txn_ids", async () => { + const promise = slidingSync.setListRanges(0, [[20, 40]]); + let pending = true; + promise.finally(() => { + pending = false; + }); + let txnId; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("txn got ", body); + expect(body.room_subscriptions).toBeFalsy(); + expect(body.lists[0]).toEqual({ + ranges: [[20, 40]], + }); + expect(body.txn_id).toBeTruthy(); + txnId = body.txn_id; + }).respond(200, function() { + return { + pos: "ccc", + txn_id: "bogus transaction id", + lists: [{ count: 5 }], + extensions: {}, + }; + }); + await httpBackend.flushAllExpected(); + expect(txnId).toBeDefined(); + expect(pending).toBe(true); + slidingSync.stop(); + }); + }); + describe("extensions", () => { beforeAll(setupClient); afterAll(teardownClient); diff --git a/src/sliding-sync.ts b/src/sliding-sync.ts index 5254a077dcf..9ce62a07333 100644 --- a/src/sliding-sync.ts +++ b/src/sliding-sync.ts @@ -68,6 +68,7 @@ export interface MSC3575SlidingSyncRequest { unsubscribe_rooms?: string[]; room_subscriptions?: Record; extensions?: object; + txn_id?: string; // query params pos?: string; @@ -126,6 +127,7 @@ type Operation = DeleteOperation | InsertOperation | InvalidateOperation | SyncO */ export interface MSC3575SlidingSyncResponse { pos: string; + txn_id?: string; lists: ListResponse[]; rooms: Record; extensions: object; @@ -334,6 +336,11 @@ export class SlidingSync extends TypedEventEmitter = {}; @@ -404,9 +411,9 @@ export class SlidingSync extends TypedEventEmitter { this.lists[index].updateListRange(ranges); - this.resend(); + return this.resend(); } /** @@ -415,14 +422,14 @@ export class SlidingSync extends TypedEventEmitter { if (this.lists[index]) { this.lists[index].replaceList(list); } else { this.lists[index] = new SlidingList(list); } this.listModifiedCount += 1; - this.resend(); + return this.resend(); } /** @@ -439,9 +446,9 @@ export class SlidingSync extends TypedEventEmitter) { + public modifyRoomSubscriptions(s: Set): Promise { this.desiredRoomSubscriptions = s; - this.resend(); + return this.resend(); } /** @@ -449,10 +456,10 @@ export class SlidingSync extends TypedEventEmitter { this.roomSubscriptionInfo = rs; this.confirmedRoomSubscriptions = new Set(); - this.resend(); + return this.resend(); } /** @@ -615,11 +622,52 @@ export class SlidingSync extends TypedEventEmitter { this.needsResend = true; + this.txnId = ""+Math.random(); + const p: Promise = new Promise((resolve, reject) => { + this.txnIdDefers.push({ + txnId: this.txnId, + resolve: resolve, + reject: reject, + }); + }); this.pendingReq?.abort(); + return p; + } + + private resolveTransactionDefers(txnId?: string) { + if (!txnId) { + return; + } + // find the matching index + let txnIndex = -1; + for (let i = 0; i < this.txnIdDefers.length; i++) { + if (this.txnIdDefers[i].txnId === txnId) { + txnIndex = i; + break; + } + } + if (txnIndex === -1) { + // this shouldn't happen; we shouldn't be seeing txn_ids for things we don't know about, + // whine about it. + logger.warn(`resolveTransactionDefers: seen ${txnId} but it isn't a pending txn, ignoring.`); + return; + } + // This list is sorted in time, so if the input txnId ACKs in the middle of this array, + // then everything before it that hasn't been ACKed yet never will and we should reject them. + for (let i = 0; i < txnIndex; i++) { + if (i < txnIndex) { + this.txnIdDefers[i].reject(this.txnIdDefers[i].txnId); + } + } + this.txnIdDefers[txnIndex].resolve(txnId); + // clear out settled promises, incuding the one we resolved. + this.txnIdDefers = this.txnIdDefers.slice(txnIndex+1); } /** @@ -666,6 +714,10 @@ export class SlidingSync extends TypedEventEmitter Date: Mon, 8 Aug 2022 14:26:24 +0100 Subject: [PATCH 2/3] Review comments --- src/sliding-sync.ts | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/sliding-sync.ts b/src/sliding-sync.ts index 9ce62a07333..9d5cefe02b3 100644 --- a/src/sliding-sync.ts +++ b/src/sliding-sync.ts @@ -19,7 +19,7 @@ import { IAbortablePromise } from "./@types/partials"; import { MatrixClient } from "./client"; import { IRoomEvent, IStateEvent } from "./sync-accumulator"; import { TypedEventEmitter } from "./models//typed-event-emitter"; -import { sleep } from "./utils"; +import { sleep, IDeferred, defer } from "./utils"; // /sync requests allow you to set a timeout= but the request may continue // beyond that and wedge forever, so we need to track how long we are willing @@ -340,7 +340,7 @@ export class SlidingSync extends TypedEventEmitter & { txnId: string})[] = []; // map of extension name to req/resp handler private extensions: Record = {}; @@ -410,6 +410,9 @@ export class SlidingSync extends TypedEventEmitter { this.lists[index].updateListRange(ranges); @@ -421,6 +424,9 @@ export class SlidingSync extends TypedEventEmitter { if (this.lists[index]) { @@ -445,6 +451,9 @@ export class SlidingSync extends TypedEventEmitter): Promise { this.desiredRoomSubscriptions = s; @@ -455,6 +464,9 @@ export class SlidingSync extends TypedEventEmitter { this.roomSubscriptionInfo = rs; @@ -628,16 +640,14 @@ export class SlidingSync extends TypedEventEmitter { this.needsResend = true; - this.txnId = ""+Math.random(); - const p: Promise = new Promise((resolve, reject) => { - this.txnIdDefers.push({ - txnId: this.txnId, - resolve: resolve, - reject: reject, - }); + this.txnId = this.client.makeTxnId(); + const d = defer(); + this.txnIdDefers.push({ + ...d, + txnId: this.txnId, }); this.pendingReq?.abort(); - return p; + return d.promise; } private resolveTransactionDefers(txnId?: string) { From fa9f078a75794de5f46bf081409dd55a30cd4dc7 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 10 Aug 2022 11:53:13 +0100 Subject: [PATCH 3/3] Review comments --- spec/integ/sliding-sync.spec.ts | 10 +++++----- src/sliding-sync.ts | 8 +++++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/spec/integ/sliding-sync.spec.ts b/spec/integ/sliding-sync.spec.ts index cc7a1d0dd93..8c4a7ad1254 100644 --- a/spec/integ/sliding-sync.spec.ts +++ b/spec/integ/sliding-sync.spec.ts @@ -584,7 +584,7 @@ describe("SlidingSync", () => { let txnId; httpBackend.when("POST", syncUrl).check(function(req) { const body = req.data; - logger.log("txn got ", body); + logger.debug("got ", body); expect(body.room_subscriptions).toBeTruthy(); expect(body.room_subscriptions[roomId]).toEqual(roomSubInfo); expect(body.txn_id).toBeTruthy(); @@ -616,7 +616,7 @@ describe("SlidingSync", () => { let txnId; httpBackend.when("POST", syncUrl).check(function(req) { const body = req.data; - logger.log("txn got ", body); + logger.debug("got ", body); expect(body.room_subscriptions).toBeFalsy(); expect(body.lists[0]).toEqual(newList); expect(body.txn_id).toBeTruthy(); @@ -638,7 +638,7 @@ describe("SlidingSync", () => { let txnId; httpBackend.when("POST", syncUrl).check(function(req) { const body = req.data; - logger.log("txn got ", body); + logger.debug("got ", body); expect(body.room_subscriptions).toBeFalsy(); expect(body.lists[0]).toEqual({ ranges: [[20, 40]], @@ -664,7 +664,7 @@ describe("SlidingSync", () => { let txnId; httpBackend.when("POST", syncUrl).check(function(req) { const body = req.data; - logger.log("txn got ", body); + logger.debug("got ", body); expect(body.room_subscriptions).toBeTruthy(); expect(body.room_subscriptions[roomId]).toEqual({ timeline_limit: 99, @@ -759,7 +759,7 @@ describe("SlidingSync", () => { let txnId; httpBackend.when("POST", syncUrl).check(function(req) { const body = req.data; - logger.log("txn got ", body); + logger.debug("got ", body); expect(body.room_subscriptions).toBeFalsy(); expect(body.lists[0]).toEqual({ ranges: [[20, 40]], diff --git a/src/sliding-sync.ts b/src/sliding-sync.ts index 9d5cefe02b3..28026b3a999 100644 --- a/src/sliding-sync.ts +++ b/src/sliding-sync.ts @@ -639,6 +639,10 @@ export class SlidingSync extends TypedEventEmitter { + if (this.needsResend && this.txnIdDefers.length > 0) { + // we already have a resend queued, so just return the same promise + return this.txnIdDefers[this.txnIdDefers.length-1].promise; + } this.needsResend = true; this.txnId = this.client.makeTxnId(); const d = defer(); @@ -671,9 +675,7 @@ export class SlidingSync extends TypedEventEmitter