diff --git a/changelog.d/9905.feature b/changelog.d/9905.feature new file mode 100644 index 000000000000..96a0e7f09fbc --- /dev/null +++ b/changelog.d/9905.feature @@ -0,0 +1 @@ +Improve performance of sending events for worker-based deployments using Redis. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9d867aaf4d55..e8330a2b50db 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2446,7 +2446,9 @@ async def _check_event_auth( # If we are going to send this event over federation we precaclculate # the joined hosts. if event.internal_metadata.get_send_on_behalf_of(): - await self.event_creation_handler.cache_joined_hosts_for_event(event) + await self.event_creation_handler.cache_joined_hosts_for_event( + event, context + ) return context diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ec8eb2167463..714ee77b205c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -51,6 +51,7 @@ from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester from synapse.util import json_decoder, json_encoder from synapse.util.async_helpers import Linearizer +from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -447,6 +448,19 @@ def __init__(self, hs: "HomeServer"): self._external_cache = hs.get_external_cache() + # Stores the state groups we've recently added to the joined hosts + # external cache. Note that the timeout must be significantly less than + # the TTL on the external cache. + self._external_cache_joined_hosts_updates = ( + None + ) # type: Optional[ExpiringCache] + if self._external_cache.is_enabled(): + self._external_cache_joined_hosts_updates = ExpiringCache( + "_external_cache_joined_hosts_updates", + self.clock, + expiry_ms=30 * 60 * 1000, + ) + async def create_event( self, requester: Requester, @@ -957,7 +971,7 @@ async def handle_new_client_event( await self.action_generator.handle_push_actions_for_event(event, context) - await self.cache_joined_hosts_for_event(event) + await self.cache_joined_hosts_for_event(event, context) try: # If we're a worker we need to hit out to the master. @@ -998,7 +1012,9 @@ async def handle_new_client_event( await self.store.remove_push_actions_from_staging(event.event_id) raise - async def cache_joined_hosts_for_event(self, event: EventBase) -> None: + async def cache_joined_hosts_for_event( + self, event: EventBase, context: EventContext + ) -> None: """Precalculate the joined hosts at the event, when using Redis, so that external federation senders don't have to recalculate it themselves. """ @@ -1006,6 +1022,9 @@ async def cache_joined_hosts_for_event(self, event: EventBase) -> None: if not self._external_cache.is_enabled(): return + # If external cache is enabled we should always have this. + assert self._external_cache_joined_hosts_updates is not None + # We actually store two mappings, event ID -> prev state group, # state group -> joined hosts, which is much more space efficient # than event ID -> joined hosts. @@ -1013,16 +1032,21 @@ async def cache_joined_hosts_for_event(self, event: EventBase) -> None: # Note: We have to cache event ID -> prev state group, as we don't # store that in the DB. # - # Note: We always set the state group -> joined hosts cache, even if - # we already set it, so that the expiry time is reset. + # Note: We set the state group -> joined hosts cache if it hasn't been + # set for a while, so that the expiry time is reset. state_entry = await self.state.resolve_state_groups_for_events( event.room_id, event_ids=event.prev_event_ids() ) if state_entry.state_group: + if state_entry.state_group in self._external_cache_joined_hosts_updates: + return + joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry) + # Note that the expiry times must be larger than the expiry time in + # _external_cache_joined_hosts_updates. await self._external_cache.set( "event_to_prev_state_group", event.event_id, @@ -1036,6 +1060,8 @@ async def cache_joined_hosts_for_event(self, event: EventBase) -> None: expiry_ms=60 * 60 * 1000, ) + self._external_cache_joined_hosts_updates[state_entry.state_group] = None + async def _validate_canonical_alias( self, directory_handler, room_alias_str: str, expected_room_id: str ) -> None: