From 833d6813af12bfea4ef5cc569261ecd8932edc44 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Mar 2021 16:59:16 +0000 Subject: [PATCH 1/3] Make federation catchup send last event from any server. Currently federation catchup will send the last *local* event that we failed to send to the remote. This can cause issues for large rooms where lots of servers have sent events while the remote server was down, as when it comes back up again it'll be flooded with events from various points in the DAG. Instead, let's make it so that all the servers send the most recent events, even if its not theirs. The remote should deduplicate the events, so there shouldn't be much overhead in doing this. Alternatively, the servers could only send local events if they were also extremities and hope that the other server will send the event over, but that is a bit risky. --- synapse/federation/federation_server.py | 25 +---- .../sender/per_destination_queue.py | 102 +++++++++++++++--- tests/federation/test_federation_catch_up.py | 49 +++++++++ 3 files changed, 138 insertions(+), 38 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9839d3d0160b..d84e362070d9 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -35,7 +35,7 @@ from twisted.internet.abstract import isIPAddress from twisted.python import failure -from synapse.api.constants import EduTypes, EventTypes, Membership +from synapse.api.constants import EduTypes, EventTypes from synapse.api.errors import ( AuthError, Codes, @@ -63,7 +63,7 @@ ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, ) -from synapse.types import JsonDict, get_domain_from_id +from synapse.types import JsonDict from synapse.util import glob_to_regex, json_decoder, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -727,27 +727,6 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None: if the event was unacceptable for any other reason (eg, too large, too many prev_events, couldn't find the prev_events) """ - # check that it's actually being sent from a valid destination to - # workaround bug #1753 in 0.18.5 and 0.18.6 - if origin != get_domain_from_id(pdu.sender): - # We continue to accept join events from any server; this is - # necessary for the federation join dance to work correctly. - # (When we join over federation, the "helper" server is - # responsible for sending out the join event, rather than the - # origin. See bug #1893. This is also true for some third party - # invites). - if not ( - pdu.type == "m.room.member" - and pdu.content - and pdu.content.get("membership", None) - in (Membership.JOIN, Membership.INVITE) - ): - logger.info( - "Discarding PDU %s from invalid origin %s", pdu.event_id, origin - ) - return - else: - logger.info("Accepting join PDU %s from %s", pdu.event_id, origin) # We've already checked that we know the room version by this point room_version = await self.store.get_room_version(pdu.room_id) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index cc0d765e5f2f..4a93f0753b46 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,7 +15,7 @@ # limitations under the License. import datetime import logging -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple import attr from prometheus_client import Counter @@ -77,6 +77,7 @@ def __init__( self._transaction_manager = transaction_manager self._instance_name = hs.get_instance_name() self._federation_shard_config = hs.config.worker.federation_shard_config + self._state = hs.get_state_handler() self._should_send_on_this_instance = True if not self._federation_shard_config.should_handle( @@ -415,22 +416,93 @@ async def _catch_up_transmission_loop(self) -> None: "This should not happen." % event_ids ) - if logger.isEnabledFor(logging.INFO): - rooms = [p.room_id for p in catchup_pdus] - logger.info("Catching up rooms to %s: %r", self._destination, rooms) + # We send transactions with events from one room only, as its likely + # that the remote will have to do additional processing, which may + # take some time. It's better to give it small amounts of work + # rather than risk the request timing out and repeatedly being + # retried, and not making any progress. + for pdu in catchup_pdus: + # The PDU from the DB will be the last PDU in the room from + # *this server* that wasn't sent to the remote. However, other + # servers may have sent lots of events since then, and we want + # to try and tell the remote only about the *latest* events in + # the room. This is so that it doesn't get inundated by events + # from various parts of the DAG, which all need to be processed. + # + # Note: this does mean that in large rooms a server coming back + # online will get sent the same events from all the different + # servers, but the remote will correctly deduplicate them and + # handle it only once. + + # Step 1, fetch the current extremities + extrems = await self._store.get_prev_events_for_room(pdu.room_id) + + if pdu.event_id in extrems: + # If the event is in the extremities, then great! We can just + # use that without having to do further checks. + room_catchup_pdus = [pdu] + else: + # If not, fetch the extremities and figure out which we can + # send. + extrem_events = await self._store.get_events_as_list(extrems) + + new_pdus = [] + for p in extrem_events: + # We pulled this from the DB, so it'll be non-null + assert p.internal_metadata.stream_ordering + + # Filter out events that happened before the remote went + # offline + if ( + p.internal_metadata.stream_ordering + < self._last_successful_stream_ordering + ): + continue - await self._transaction_manager.send_new_transaction( - self._destination, catchup_pdus, [] - ) + # Filter out events where the server is not in the room, + # e.g. it may have left/been kicked. *Ideally* we'd pull + # out the kick and send that, but it's a rare edge case + # so we don't bother for now (the server that sent the + # kick should send it out if its online). + hosts = await self._state.get_hosts_in_room_at_events( + p.room_id, [p.event_id] + ) + if self._destination not in hosts: + continue - sent_transactions_counter.inc() - final_pdu = catchup_pdus[-1] - self._last_successful_stream_ordering = cast( - int, final_pdu.internal_metadata.stream_ordering - ) - await self._store.set_destination_last_successful_stream_ordering( - self._destination, self._last_successful_stream_ordering - ) + new_pdus.append(p) + + # If we've filtered out all the extremities, fall back to + # sending the original event. This should ensure that the + # server gets at least some of missed events (especially if + # the other sending servers are up). + if new_pdus: + room_catchup_pdus = new_pdus + + logger.info( + "Catching up rooms to %s: %r", self._destination, pdu.room_id + ) + + await self._transaction_manager.send_new_transaction( + self._destination, room_catchup_pdus, [] + ) + + sent_transactions_counter.inc() + + # We pulled this from the DB, so it'll be non-null + assert pdu.internal_metadata.stream_ordering + + # Note that we mark the last successful stream ordering as that + # from the *original* PDU, rather than the PDU(s) we actually + # send. This is because we use it to mark our position in the + # queue of missed PDUs to process. + self._last_successful_stream_ordering = ( + pdu.internal_metadata.stream_ordering + ) + + await self._store.set_destination_last_successful_stream_ordering( + self._destination, self._last_successful_stream_ordering + ) def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: if not self._pending_rrs: diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 6f96cd79403e..95eac6a5a34c 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -2,6 +2,7 @@ from mock import Mock +from synapse.api.constants import EventTypes from synapse.events import EventBase from synapse.federation.sender import PerDestinationQueue, TransactionManager from synapse.federation.units import Edu @@ -421,3 +422,51 @@ def wake_destination_track(destination): self.assertNotIn("zzzerver", woken) # - all destinations are woken exactly once; they appear once in woken. self.assertCountEqual(woken, server_names[:-1]) + + @override_config({"send_federation": True}) + def test_not_latest_event(self): + """Test that we send the latest event in the room even if its not ours.""" + + per_dest_queue, sent_pdus = self.make_fake_destination_queue() + + # Make a room with a local user, and two servers. One will go offline + # and one will send some events. + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room_1 = self.helper.create_room_as("u1", tok=u1_token) + + self.get_success( + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + ) + event_1 = self.get_success( + event_injection.inject_member_event(self.hs, room_1, "@user:host3", "join") + ) + + # First we send something from the local server, so that we notice the + # remote is down and go into catchup mode. + self.helper.send(room_1, "you hear me!!", tok=u1_token) + + # Now simulate us receiving an event from the still online remote. + event_2 = self.get_success( + event_injection.inject_event( + self.hs, + type=EventTypes.Message, + sender="@user:host3", + room_id=room_1, + content={"msgtype": "m.text", "body": "Hello"}, + ) + ) + + self.get_success( + self.hs.get_datastore().set_destination_last_successful_stream_ordering( + "host2", event_1.internal_metadata.stream_ordering + ) + ) + + self.get_success(per_dest_queue._catch_up_transmission_loop()) + + # We expect only the last message from the remote, event_2, to have been + # sent, rather than the last *local* event that was sent. + self.assertEqual(len(sent_pdus), 1) + self.assertEqual(sent_pdus[0].event_id, event_2.event_id) + self.assertFalse(per_dest_queue._catching_up) From 54571595598e8138f6ed069275e9febab1d06dd1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Mar 2021 17:47:40 +0000 Subject: [PATCH 2/3] Newsfile --- changelog.d/9640.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9640.misc diff --git a/changelog.d/9640.misc b/changelog.d/9640.misc new file mode 100644 index 000000000000..3d410ed4cdfe --- /dev/null +++ b/changelog.d/9640.misc @@ -0,0 +1 @@ +Improve performance of federation catch up by sending events the latest events in the room to the remote, rather than just the last event sent by the local server. From 2cf843ec2c6d7eed9c1c4c02a065bc457ad32f9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Mar 2021 14:40:25 +0000 Subject: [PATCH 3/3] Add comment --- synapse/federation/sender/per_destination_queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 4a93f0753b46..af85fe0a1ec1 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -421,6 +421,8 @@ async def _catch_up_transmission_loop(self) -> None: # take some time. It's better to give it small amounts of work # rather than risk the request timing out and repeatedly being # retried, and not making any progress. + # + # Note: `catchup_pdus` will have exactly one PDU per room. for pdu in catchup_pdus: # The PDU from the DB will be the last PDU in the room from # *this server* that wasn't sent to the remote. However, other