From 475d74f3889cae1bcc5dbcc1f2b7a0e92ac13b61 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 16 Feb 2024 14:28:34 +0000 Subject: [PATCH 1/5] `calculate_state`: add comments and a test --- synapse/handlers/sync.py | 64 ++++++++++++++++++++ tests/handlers/test_sync.py | 114 ++++++++++++++++++++++++++++++++---- 2 files changed, 166 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 08fe4eb3b36..73ebcda1959 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2771,6 +2771,70 @@ def _calculate_state( e for t, e in timeline_start.items() if t[0] == EventTypes.Member ) + # Naively, we would just return the difference between the state at the start + # of the timeline (`timeline_start_ids`) and that at the end of the previous sync + # (`previous_timeline_end_ids`). However, that fails in the presence of forks in + # the DAG. + # + # For example, consider a DAG such as the following: + # + # E1 + # ↗ ↖ + # | S2 + # | ↑ + # --|------|---- + # | | + # E3 | + # ↖ / + # E4 + # + # ... and a filter that means we only return 2 events, represented by the dashed + # horizontal line. Assuming S2 was *not* included in the previous sync, we need to + # include it in the `state` section. + # + # Note that the state at the start of the timeline (E3) does not include S2. So, + # to make sure it gets included in the calculation here, we actually look at + # the state at the *end* of the timeline, and subtract any events that are present + # in the timeline. + # + # ---------- + # + # Aside 1: You may then wonder if we need to include `timeline_start` in the + # calculation. Consider a linear DAG: + # + # E1 + # ↑ + # S2 + # ↑ + # ----|------ + # | + # E3 + # ↑ + # S4 + # ↑ + # E5 + # + # ... where S2 and S4 change the same piece of state; and where we have a filter + # that returns 3 events (E3, S4, E5). We still need to tell the client about S2, + # because it might affect the display of E3. However, the state at the end of the + # timeline only tells us about S4; if we don't inspect `timeline_start` we won't + # find out about S2. + # + # (There are yet more complicated cases in which a state event is excluded from the + # timeline, but whose effect actually lands in the DAG in the *middle* of the + # timeline. We have no way to represent that in the /sync response, and we don't + # even try; it is ether omitted or plonked into `state` as if it were at the start + # of the timeline, depending on what else is in the timeline.) + # + # ---------- + # + # Aside 2: it's worth noting that `timeline_end`, as provided to us, is actually + # the state *before* the final event in the timeline. In other words: if the final + # event in the timeline is a state event, it won't be included in `timeline_end`. + # However, that doesn't matter here, because the only difference can be in that + # one piece of state, and by definition that event is in the timeline, so we + # don't need to include it in the `state` section. + state_ids = ( (timeline_end_ids | timeline_start_ids) - previous_timeline_end_ids diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 37904926e30..96be94b8c9f 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -17,14 +17,14 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import Optional +from typing import ContextManager, List, Optional from unittest.mock import AsyncMock, Mock, patch from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventTypes, JoinRules from synapse.api.errors import Codes, ResourceLimitError -from synapse.api.filtering import Filtering +from synapse.api.filtering import FilterCollection, Filtering from synapse.api.room_versions import RoomVersions from synapse.handlers.sync import SyncConfig, SyncResult from synapse.rest import admin @@ -255,13 +255,7 @@ def test_ban_wins_race_with_join(self) -> None: # Eve tries to join the room. We monkey patch the internal logic which selects # the prev_events used when creating the join event, such that the ban does not # precede the join. - mocked_get_prev_events = patch.object( - self.hs.get_datastores().main, - "get_prev_events_for_room", - new_callable=AsyncMock, - return_value=[last_room_creation_event_id], - ) - with mocked_get_prev_events: + with self._patch_get_latest_events([last_room_creation_event_id]): self.helper.join(room_id, eve, tok=eve_token) # Eve makes a second, incremental sync. @@ -285,19 +279,115 @@ def test_ban_wins_race_with_join(self) -> None: ) self.assertEqual(eve_initial_sync_after_join.joined, []) + def test_state_includes_changes_on_forks(self) -> None: + """State changes that happen on a fork of the DAG must be included in `state` + + Given the following DAG: + + E1 + ↗ ↖ + | S2 + | ↑ + --|------|---- + | | + E3 | + ↖ / + E4 + + ... and a filter that means we only return 2 events, represented by the dashed + horizontal line: `S2` must be included in the `state` section. + """ + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + alice_requester = create_requester(alice) + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + + # Do an initial sync as Alice to get a known starting point. + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, generate_sync_config(alice) + ) + ) + last_room_creation_event_id = ( + initial_sync_result.joined[0].timeline.events[-1].event_id + ) + + # Send a state event, and a regular event, both using the same prev ID + with self._patch_get_latest_events([last_room_creation_event_id]): + s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[ + "event_id" + ] + e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"] + + # Send a final event, joining the two branches of the dag + e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"] + + # do an incremental sync, with a filter that will ensure we only get two of + # the three new events. + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, {"room": {"timeline": {"limit": 2}}} + ), + ), + since_token=initial_sync_result.next_batch, + ) + ) + + # The state event should appear in the 'state' section of the response. + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertTrue(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e3_event, e4_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) + + def _patch_get_latest_events(self, latest_events: List[str]) -> ContextManager: + """Monkey-patch `get_prev_events_for_room` + + Returns a context manager which will replace the implementation of + `get_prev_events_for_room` with one which returns `latest_events`. + """ + return patch.object( + self.hs.get_datastores().main, + "get_prev_events_for_room", + new_callable=AsyncMock, + return_value=latest_events, + ) + _request_key = 0 def generate_sync_config( - user_id: str, device_id: Optional[str] = "device_id" + user_id: str, + device_id: Optional[str] = "device_id", + filter_collection: Optional[FilterCollection] = None, ) -> SyncConfig: - """Generate a sync config (with a unique request key).""" + """Generate a sync config (with a unique request key). + + Args: + user_id: user who is syncing. + device_id: device that is syncing. Defaults to "device_id". + filter_collection: filter to apply. Defaults to the default filter (ie, + return everything, with a default limit) + """ + if filter_collection is None: + filter_collection = Filtering(Mock()).DEFAULT_FILTER_COLLECTION + global _request_key _request_key += 1 return SyncConfig( user=UserID.from_string(user_id), - filter_collection=Filtering(Mock()).DEFAULT_FILTER_COLLECTION, + filter_collection=filter_collection, is_guest=False, request_key=("request_key", _request_key), device_id=device_id, From d3ae25fbe0d40ec95c05ad8368318eca239eedaf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 16 Feb 2024 22:10:41 +0000 Subject: [PATCH 2/5] Add an `end_token` to `RoomSyncResultBuilder` ... to represent the last point in the room we actually want to return state from. --- synapse/handlers/sync.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 73ebcda1959..e5303b3bffa 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2334,6 +2334,7 @@ async def _get_room_changes_for_incremental_sync( full_state=False, since_token=since_token, upto_token=leave_token, + end_token=leave_token, out_of_band=leave_event.internal_metadata.is_out_of_band_membership(), ) ) @@ -2371,6 +2372,7 @@ async def _get_room_changes_for_incremental_sync( full_state=False, since_token=None if newly_joined else since_token, upto_token=prev_batch_token, + end_token=now_token, ) else: entry = RoomSyncResultBuilder( @@ -2381,6 +2383,7 @@ async def _get_room_changes_for_incremental_sync( full_state=False, since_token=since_token, upto_token=since_token, + end_token=now_token, ) room_entries.append(entry) @@ -2439,6 +2442,7 @@ async def _get_room_changes_for_initial_sync( full_state=True, since_token=since_token, upto_token=now_token, + end_token=now_token, ) ) elif event.membership == Membership.INVITE: @@ -2468,6 +2472,7 @@ async def _get_room_changes_for_initial_sync( full_state=True, since_token=since_token, upto_token=leave_token, + end_token=leave_token, ) ) @@ -2930,13 +2935,30 @@ class RoomSyncResultBuilder: Attributes: room_id + rtype: One of `"joined"` or `"archived"` + events: List of events to include in the room (more events may be added when generating result). + newly_joined: If the user has newly joined the room + full_state: Whether the full state should be sent in result + since_token: Earliest point to return events from, or None - upto_token: Latest point to return events from. + + upto_token: Latest point to return events from. If `events` is populated, + this is set to the token at the start of `events` + + end_token: The last point in the timeline that the client should see events + from. Normally this will be the same as the global `now_token`, but in + the case of rooms where the user has left the room, this will be the point + just after their leave event. + + This is used in the calculation of the state which is returned in `state`: + any state changes *up to* `end_token` (and not beyond!) which are not + reflected in the timeline need to be returned in `state`. + out_of_band: whether the events in the room are "out of band" events and the server isn't in the room. """ @@ -2948,5 +2970,5 @@ class RoomSyncResultBuilder: full_state: bool since_token: Optional[StreamToken] upto_token: StreamToken - + end_token: StreamToken out_of_band: bool = False From 331546870d06c03f2fd44a13813b6498776c185e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 16 Feb 2024 22:40:43 +0000 Subject: [PATCH 3/5] Use `end_token` for state calculation Pass the `end_token` down into `compute_state_delta` instead of `now_token`, and use it to calculate the state at the end of the timeline. --- synapse/handlers/sync.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e5303b3bffa..fb99d86cc0d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -943,7 +943,7 @@ async def compute_state_delta( batch: TimelineBatch, sync_config: SyncConfig, since_token: Optional[StreamToken], - now_token: StreamToken, + end_token: StreamToken, full_state: bool, ) -> MutableStateMap[EventBase]: """Works out the difference in state between the end of the previous sync and @@ -954,7 +954,9 @@ async def compute_state_delta( batch: The timeline batch for the room that will be sent to the user. sync_config: since_token: Token of the end of the previous batch. May be `None`. - now_token: Token of the end of the current batch. + end_token: Token of the end of the current batch. Normally this will be + the same as the global "now_token", but if the user has left the room, + the point just after their leave event. full_state: Whether to force returning the full state. `lazy_load_members` still applies when `full_state` is `True`. @@ -1034,7 +1036,7 @@ async def compute_state_delta( room_id, sync_config.user, batch, - now_token, + end_token, members_to_fetch, timeline_state, ) @@ -1048,7 +1050,7 @@ async def compute_state_delta( room_id, batch, since_token, - now_token, + end_token, members_to_fetch, timeline_state, ) @@ -1120,7 +1122,7 @@ async def _compute_state_delta_for_full_sync( room_id: str, syncing_user: UserID, batch: TimelineBatch, - now_token: StreamToken, + end_token: StreamToken, members_to_fetch: Optional[Set[str]], timeline_state: StateMap[str], ) -> StateMap[str]: @@ -1133,7 +1135,9 @@ async def _compute_state_delta_for_full_sync( room_id: The room we are calculating for. syncing_user: The user that is calling `/sync`. batch: The timeline batch for the room that will be sent to the user. - now_token: Token of the end of the current batch. + end_token: Token of the end of the current batch. Normally this will be + the same as the global "now_token", but if the user has left the room, + the point just after their leave event. members_to_fetch: If lazy-loading is enabled, the memberships needed for events in the timeline. timeline_state: The contribution to the room state from state events in @@ -1192,7 +1196,7 @@ async def _compute_state_delta_for_full_sync( else: state_at_timeline_end = await self.get_state_at( room_id, - stream_position=now_token, + stream_position=end_token, state_filter=state_filter, await_full_state=await_full_state, ) @@ -1213,7 +1217,7 @@ async def _compute_state_delta_for_incremental_sync( room_id: str, batch: TimelineBatch, since_token: StreamToken, - now_token: StreamToken, + end_token: StreamToken, members_to_fetch: Optional[Set[str]], timeline_state: StateMap[str], ) -> StateMap[str]: @@ -1229,7 +1233,9 @@ async def _compute_state_delta_for_incremental_sync( room_id: The room we are calculating for. batch: The timeline batch for the room that will be sent to the user. since_token: Token of the end of the previous batch. - now_token: Token of the end of the current batch. + end_token: Token of the end of the current batch. Normally this will be + the same as the global "now_token", but if the user has left the room, + the point just after their leave event. members_to_fetch: If lazy-loading is enabled, the memberships needed for events in the timeline. Otherwise, `None`. timeline_state: The contribution to the room state from state events in @@ -1263,7 +1269,7 @@ async def _compute_state_delta_for_incremental_sync( # the recent events. state_at_timeline_start = await self.get_state_at( room_id, - stream_position=now_token, + stream_position=end_token, state_filter=state_filter, await_full_state=await_full_state, ) @@ -1302,7 +1308,7 @@ async def _compute_state_delta_for_incremental_sync( # the recent events. state_at_timeline_end = await self.get_state_at( room_id, - stream_position=now_token, + stream_position=end_token, state_filter=state_filter, await_full_state=await_full_state, ) @@ -2543,6 +2549,7 @@ async def _generate_room_entry( { "since_token": since_token, "upto_token": upto_token, + "end_token": room_builder.end_token, } ) @@ -2616,7 +2623,7 @@ async def _generate_room_entry( batch, sync_config, since_token, - now_token, + room_builder.end_token, full_state=full_state, ) else: From cb87384a9b0781b9d75a35e96a31094d361b1ae4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 17 Feb 2024 01:19:10 +0000 Subject: [PATCH 4/5] Add test for archived rooms --- tests/handlers/test_sync.py | 94 ++++++++++++++++++++++++++++++++++++- tests/rest/client/utils.py | 18 ++++--- 2 files changed, 104 insertions(+), 8 deletions(-) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 96be94b8c9f..128853702ea 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -20,6 +20,8 @@ from typing import ContextManager, List, Optional from unittest.mock import AsyncMock, Mock, patch +from parameterized import parameterized + from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventTypes, JoinRules @@ -30,7 +32,7 @@ from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer -from synapse.types import UserID, create_requester +from synapse.types import JsonDict, UserID, create_requester from synapse.util import Clock import tests.unittest @@ -350,6 +352,96 @@ def test_state_includes_changes_on_forks(self) -> None: [s2_event], ) + @parameterized.expand( + [ + (False, False), + (True, False), + (False, True), + (True, True), + ] + ) + def test_archived_rooms_do_not_include_state_after_leave( + self, initial_sync: bool, empty_timeline: bool + ) -> None: + """If the user leaves the room, state changes that happen after they leave are not returned. + + We try with both a zero and a normal timeline limit, + and we try both an initial sync and an incremental sync for both. + """ + if empty_timeline and not initial_sync: + # FIXME synapse doesn't return the room at all in this situation! + self.skipTest("Synapse does not correctly handle this case") + + # Alice creates the room, and bob joins. + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + + bob = self.register_user("bob", "password") + bob_tok = self.login(bob, "password") + bob_requester = create_requester(bob) + + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + self.helper.join(room_id, bob, tok=bob_tok) + + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + bob_requester, generate_sync_config(bob) + ) + ) + + # Alice sends a message and a state + before_message_event = self.helper.send(room_id, "before", tok=alice_tok)[ + "event_id" + ] + before_state_event = self.helper.send_state( + room_id, "test_state", {"body": "before"}, tok=alice_tok + )["event_id"] + + # Bob leaves + leave_event = self.helper.leave(room_id, bob, tok=bob_tok)["event_id"] + + # Alice sends some more stuff + self.helper.send(room_id, "after", tok=alice_tok)["event_id"] + self.helper.send_state(room_id, "test_state", {"body": "after"}, tok=alice_tok)[ + "event_id" + ] + + # And now, Bob resyncs. + filter_dict: JsonDict = {"room": {"include_leave": True}} + if empty_timeline: + filter_dict["room"]["timeline"] = {"limit": 0} + sync_room_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + bob_requester, + generate_sync_config( + bob, filter_collection=FilterCollection(self.hs, filter_dict) + ), + since_token=None if initial_sync else initial_sync_result.next_batch, + ) + ).archived[0] + + if empty_timeline: + # The timeline should be empty + self.assertEqual(sync_room_result.timeline.events, []) + + # And the state should include the leave event... + self.assertEqual( + sync_room_result.state[("m.room.member", bob)].event_id, leave_event + ) + # ... and the state change before he left. + self.assertEqual( + sync_room_result.state[("test_state", "")].event_id, before_state_event + ) + else: + # The last three events in the timeline should be those leading up to the + # leave + self.assertEqual( + [e.event_id for e in sync_room_result.timeline.events[-3:]], + [before_message_event, before_state_event, leave_event], + ) + # ... And the state should be empty + self.assertEqual(sync_room_result.state, {}) + def _patch_get_latest_events(self, latest_events: List[str]) -> ContextManager: """Monkey-patch `get_prev_events_for_room` diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index daa68d78b93..fe00afe1986 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -170,8 +170,8 @@ def invite( targ: Optional[str] = None, expect_code: int = HTTPStatus.OK, tok: Optional[str] = None, - ) -> None: - self.change_membership( + ) -> JsonDict: + return self.change_membership( room=room, src=src, targ=targ, @@ -189,8 +189,8 @@ def join( appservice_user_id: Optional[str] = None, expect_errcode: Optional[Codes] = None, expect_additional_fields: Optional[dict] = None, - ) -> None: - self.change_membership( + ) -> JsonDict: + return self.change_membership( room=room, src=user, targ=user, @@ -242,8 +242,8 @@ def leave( user: Optional[str] = None, expect_code: int = HTTPStatus.OK, tok: Optional[str] = None, - ) -> None: - self.change_membership( + ) -> JsonDict: + return self.change_membership( room=room, src=user, targ=user, @@ -282,7 +282,7 @@ def change_membership( expect_code: int = HTTPStatus.OK, expect_errcode: Optional[str] = None, expect_additional_fields: Optional[dict] = None, - ) -> None: + ) -> JsonDict: """ Send a membership state event into a room. @@ -298,6 +298,9 @@ def change_membership( using an application service access token in `tok`. expect_code: The expected HTTP response code expect_errcode: The expected Matrix error code + + Returns: + The JSON response """ temp_id = self.auth_user_id self.auth_user_id = src @@ -356,6 +359,7 @@ def change_membership( ) self.auth_user_id = temp_id + return channel.json_body def send( self, From 0a0d43278365203824e52df7af51a32bf112d991 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 17 Feb 2024 01:30:26 +0000 Subject: [PATCH 5/5] changelog --- changelog.d/16932.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16932.bugfix diff --git a/changelog.d/16932.bugfix b/changelog.d/16932.bugfix new file mode 100644 index 00000000000..624388ea8e0 --- /dev/null +++ b/changelog.d/16932.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug which could cause incorrect state to be returned from `/sync` for rooms where the user has left. \ No newline at end of file