diff --git a/changelog.d/8929.misc b/changelog.d/8929.misc new file mode 100644 index 000000000000..157018b6a686 --- /dev/null +++ b/changelog.d/8929.misc @@ -0,0 +1 @@ +Automatically drop stale forward-extremities under some specific conditions. diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 592abd844b8a..d2768a088193 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -95,6 +95,8 @@ class EventTypes: Presence = "m.presence" + Dummy = "org.matrix.dummy_event" + class RejectedReason: AUTH_ERROR = "auth_error" diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 96843338aefd..02964540a351 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1261,7 +1261,7 @@ async def _send_dummy_event_for_room(self, room_id: str) -> bool: event, context = await self.create_event( requester, { - "type": "org.matrix.dummy_event", + "type": EventTypes.Dummy, "content": {}, "room_id": room_id, "sender": user_id, diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 70e636b0bac0..61fc49c69c8d 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -31,7 +31,14 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases import Databases from synapse.storage.databases.main.events import DeltaState -from synapse.types import Collection, PersistedEventPosition, RoomStreamToken, StateMap +from synapse.storage.databases.main.events_worker import EventRedactBehaviour +from synapse.types import ( + Collection, + PersistedEventPosition, + RoomStreamToken, + StateMap, + get_domain_from_id, +) from synapse.util.async_helpers import ObservableDeferred from synapse.util.metrics import Measure @@ -68,6 +75,21 @@ buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), ) +state_resolutions_during_persistence = Counter( + "synapse_storage_events_state_resolutions_during_persistence", + "Number of times we had to do state res to calculate new current state", +) + +potential_times_prune_extremities = Counter( + "synapse_storage_events_potential_times_prune_extremities", + "Number of times we might be able to prune extremities", +) + +times_pruned_extremities = Counter( + "synapse_storage_events_times_pruned_extremities", + "Number of times we were actually be able to prune extremities", +) + class _EventPeristenceQueue: """Queues up events so that they can be persisted in bulk with only one @@ -454,7 +476,15 @@ async def _persist_events( latest_event_ids, new_latest_event_ids, ) - current_state, delta_ids = res + current_state, delta_ids, new_latest_event_ids = res + + # there should always be at least one forward extremity. + # (except during the initial persistence of the send_join + # results, in which case there will be no existing + # extremities, so we'll `continue` above and skip this bit.) + assert new_latest_event_ids, "No forward extremities left!" + + new_forward_extremeties[room_id] = new_latest_event_ids # If either are not None then there has been a change, # and we need to work out the delta (or use that @@ -573,29 +603,35 @@ async def _get_new_state_after_events( self, room_id: str, events_context: List[Tuple[EventBase, EventContext]], - old_latest_event_ids: Iterable[str], - new_latest_event_ids: Iterable[str], - ) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]]]: + old_latest_event_ids: Set[str], + new_latest_event_ids: Set[str], + ) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]], Set[str]]: """Calculate the current state dict after adding some new events to a room Args: - room_id (str): + room_id: room to which the events are being added. Used for logging etc - events_context (list[(EventBase, EventContext)]): + events_context: events and contexts which are being added to the room - old_latest_event_ids (iterable[str]): + old_latest_event_ids: the old forward extremities for the room. - new_latest_event_ids (iterable[str]): + new_latest_event_ids : the new forward extremities for the room. Returns: - Returns a tuple of two state maps, the first being the full new current - state and the second being the delta to the existing current state. - If both are None then there has been no change. + Returns a tuple of two state maps and a set of new forward + extremities. + + The first state map is the full new current state and the second + is the delta to the existing current state. If both are None then + there has been no change. + + The function may prune some old entries from the set of new + forward extremities if it's safe to do so. If there has been a change then we only return the delta if its already been calculated. Conversely if we do know the delta then @@ -672,7 +708,7 @@ async def _get_new_state_after_events( # If they old and new groups are the same then we don't need to do # anything. if old_state_groups == new_state_groups: - return None, None + return None, None, new_latest_event_ids if len(new_state_groups) == 1 and len(old_state_groups) == 1: # If we're going from one state group to another, lets check if @@ -689,7 +725,7 @@ async def _get_new_state_after_events( # the current state in memory then lets also return that, # but it doesn't matter if we don't. new_state = state_groups_map.get(new_state_group) - return new_state, delta_ids + return new_state, delta_ids, new_latest_event_ids # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -701,7 +737,7 @@ async def _get_new_state_after_events( if len(new_state_groups) == 1: # If there is only one state group, then we know what the current # state is. - return state_groups_map[new_state_groups.pop()], None + return state_groups_map[new_state_groups.pop()], None, new_latest_event_ids # Ok, we need to defer to the state handler to resolve our state sets. @@ -734,7 +770,139 @@ async def _get_new_state_after_events( state_res_store=StateResolutionStore(self.main_store), ) - return res.state, None + state_resolutions_during_persistence.inc() + + # If the returned state matches the state group of one of the new + # forward extremities then we check if we are able to prune some state + # extremities. + if res.state_group and res.state_group in new_state_groups: + new_latest_event_ids = await self._prune_extremities( + room_id, + new_latest_event_ids, + res.state_group, + event_id_to_state_group, + events_context, + ) + + return res.state, None, new_latest_event_ids + + async def _prune_extremities( + self, + room_id: str, + new_latest_event_ids: Set[str], + resolved_state_group: int, + event_id_to_state_group: Dict[str, int], + events_context: List[Tuple[EventBase, EventContext]], + ) -> Set[str]: + """See if we can prune any of the extremities after calculating the + resolved state. + """ + potential_times_prune_extremities.inc() + + # We keep all the extremities that have the same state group, and + # see if we can drop the others. + new_new_extrems = { + e + for e in new_latest_event_ids + if event_id_to_state_group[e] == resolved_state_group + } + + dropped_extrems = set(new_latest_event_ids) - new_new_extrems + + logger.debug("Might drop extremities: %s", dropped_extrems) + + # We only drop events from the extremities list if: + # 1. we're not currently persisting them; + # 2. they're not our own events (or are dummy events); and + # 3. they're either: + # 1. over N hours old and more than N events ago (we use depth to + # calculate); or + # 2. we are persisting an event from the same domain and more than + # M events ago. + # + # The idea is that we don't want to drop events that are "legitimate" + # extremities (that we would want to include as prev events), only + # "stuck" extremities that are e.g. due to a gap in the graph. + # + # Note that we either drop all of them or none of them. If we only drop + # some of the events we don't know if state res would come to the same + # conclusion. + + for ev, _ in events_context: + if ev.event_id in dropped_extrems: + logger.debug( + "Not dropping extremities: %s is being persisted", ev.event_id + ) + return new_latest_event_ids + + dropped_events = await self.main_store.get_events( + dropped_extrems, + allow_rejected=True, + redact_behaviour=EventRedactBehaviour.AS_IS, + ) + + new_senders = {get_domain_from_id(e.sender) for e, _ in events_context} + + one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000 + current_depth = max(e.depth for e, _ in events_context) + for event in dropped_events.values(): + # If the event is a local dummy event then we should check it + # doesn't reference any local events, as we want to reference those + # if we send any new events. + # + # Note we do this recursively to handle the case where a dummy event + # references a dummy event that only references remote events. + # + # Ideally we'd figure out a way of still being able to drop old + # dummy events that reference local events, but this is good enough + # as a first cut. + events_to_check = [event] + while events_to_check: + new_events = set() + for event_to_check in events_to_check: + if self.is_mine_id(event_to_check.sender): + if event_to_check.type != EventTypes.Dummy: + logger.debug("Not dropping own event") + return new_latest_event_ids + new_events.update(event_to_check.prev_event_ids()) + + prev_events = await self.main_store.get_events( + new_events, + allow_rejected=True, + redact_behaviour=EventRedactBehaviour.AS_IS, + ) + events_to_check = prev_events.values() + + if ( + event.origin_server_ts < one_day_ago + and event.depth < current_depth - 100 + ): + continue + + # We can be less conservative about dropping extremities from the + # same domain, though we do want to wait a little bit (otherwise + # we'll immediately remove all extremities from a given server). + if ( + get_domain_from_id(event.sender) in new_senders + and event.depth < current_depth - 20 + ): + continue + + logger.debug( + "Not dropping as too new and not in new_senders: %s", new_senders, + ) + + return new_latest_event_ids + + times_pruned_extremities.inc() + + logger.info( + "Pruning forward extremities in room %s: from %s -> %s", + room_id, + new_latest_event_ids, + new_new_extrems, + ) + return new_new_extrems async def _calculate_state_delta( self, room_id: str, current_state: StateMap[str] diff --git a/synapse/visibility.py b/synapse/visibility.py index 527365498e68..01bb0ac9b693 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -116,7 +116,7 @@ def allowed(event): # see events in the room at that point in the DAG, and that shouldn't be decided # on those checks. if filter_send_to_client: - if event.type == "org.matrix.dummy_event": + if event.type == EventTypes.Dummy: return None if not event.is_state() and event.sender in ignore_list: diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py new file mode 100644 index 000000000000..71210ce6065c --- /dev/null +++ b/tests/storage/test_events.py @@ -0,0 +1,334 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from synapse.api.constants import EventTypes, Membership +from synapse.api.room_versions import RoomVersions +from synapse.federation.federation_base import event_from_pdu_json +from synapse.rest import admin +from synapse.rest.client.v1 import login, room + +from tests.unittest import HomeserverTestCase + + +class ExtremPruneTestCase(HomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, homeserver): + self.state = self.hs.get_state_handler() + self.persistence = self.hs.get_storage().persistence + self.store = self.hs.get_datastore() + + self.register_user("user", "pass") + self.token = self.login("user", "pass") + + self.room_id = self.helper.create_room_as( + "user", room_version=RoomVersions.V6.identifier, tok=self.token + ) + + body = self.helper.send(self.room_id, body="Test", tok=self.token) + local_message_event_id = body["event_id"] + + # Fudge a remote event and persist it. This will be the extremity before + # the gap. + self.remote_event_1 = event_from_pdu_json( + { + "type": EventTypes.Message, + "state_key": "@user:other", + "content": {}, + "room_id": self.room_id, + "sender": "@user:other", + "depth": 5, + "prev_events": [local_message_event_id], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + self.persist_event(self.remote_event_1) + + # Check that the current extremities is the remote event. + self.assert_extremities([self.remote_event_1.event_id]) + + def persist_event(self, event, state=None): + """Persist the event, with optional state + """ + context = self.get_success( + self.state.compute_event_context(event, old_state=state) + ) + self.get_success(self.persistence.persist_event(event, context)) + + def assert_extremities(self, expected_extremities): + """Assert the current extremities for the room + """ + extremities = self.get_success( + self.store.get_prev_events_for_room(self.room_id) + ) + self.assertCountEqual(extremities, expected_extremities) + + def test_prune_gap(self): + """Test that we drop extremities after a gap when we see an event from + the same domain. + """ + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other", + "depth": 50, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([remote_event_2.event_id]) + + def test_do_not_prune_gap_if_state_different(self): + """Test that we don't prune extremities after a gap if the resolved + state is different. + """ + + # Fudge a second event which points to an event we don't have. + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Message, + "state_key": "@user:other", + "content": {}, + "room_id": self.room_id, + "sender": "@user:other", + "depth": 10, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + # Now we persist it with state with a dropped history visibility + # setting. The state resolution across the old and new event will then + # include it, and so the resolved state won't match the new state. + state_before_gap = dict( + self.get_success(self.state.get_current_state(self.room_id)) + ) + state_before_gap.pop(("m.room.history_visibility", "")) + + context = self.get_success( + self.state.compute_event_context( + remote_event_2, old_state=state_before_gap.values() + ) + ) + + self.get_success(self.persistence.persist_event(remote_event_2, context)) + + # Check that we haven't dropped the old extremity. + self.assert_extremities([self.remote_event_1.event_id, remote_event_2.event_id]) + + def test_prune_gap_if_old(self): + """Test that we drop extremities after a gap when the previous extremity + is "old" + """ + + # Advance the clock for many days to make the old extremity "old". We + # also set the depth to "lots". + self.reactor.advance(7 * 24 * 60 * 60) + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other2", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other2", + "depth": 10000, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([remote_event_2.event_id]) + + def test_do_not_prune_gap_if_other_server(self): + """Test that we do not drop extremities after a gap when we see an event + from a different domain. + """ + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other2", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other2", + "depth": 10, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([self.remote_event_1.event_id, remote_event_2.event_id]) + + def test_prune_gap_if_dummy_remote(self): + """Test that we drop extremities after a gap when the previous extremity + is a local dummy event and only points to remote events. + """ + + body = self.helper.send_event( + self.room_id, type=EventTypes.Dummy, content={}, tok=self.token + ) + local_message_event_id = body["event_id"] + self.assert_extremities([local_message_event_id]) + + # Advance the clock for many days to make the old extremity "old". We + # also set the depth to "lots". + self.reactor.advance(7 * 24 * 60 * 60) + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other2", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other2", + "depth": 10000, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([remote_event_2.event_id]) + + def test_prune_gap_if_dummy_local(self): + """Test that we don't drop extremities after a gap when the previous + extremity is a local dummy event and points to local events. + """ + + body = self.helper.send(self.room_id, body="Test", tok=self.token) + + body = self.helper.send_event( + self.room_id, type=EventTypes.Dummy, content={}, tok=self.token + ) + local_message_event_id = body["event_id"] + self.assert_extremities([local_message_event_id]) + + # Advance the clock for many days to make the old extremity "old". We + # also set the depth to "lots". + self.reactor.advance(7 * 24 * 60 * 60) + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other2", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other2", + "depth": 10000, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([remote_event_2.event_id, local_message_event_id]) + + def test_do_not_prune_gap_if_not_dummy(self): + """Test that we do not drop extremities after a gap when the previous extremity + is not a dummy event. + """ + + body = self.helper.send(self.room_id, body="test", tok=self.token) + local_message_event_id = body["event_id"] + self.assert_extremities([local_message_event_id]) + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other2", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other2", + "depth": 10000, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([local_message_event_id, remote_event_2.event_id])