Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't set the external cache if its been done recently #9905

Merged
merged 5 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9905.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of sending events for worker-based deployments using Redis.
4 changes: 3 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2446,7 +2446,9 @@ async def _check_event_auth(
# If we are going to send this event over federation we precaclculate
# the joined hosts.
if event.internal_metadata.get_send_on_behalf_of():
await self.event_creation_handler.cache_joined_hosts_for_event(event)
await self.event_creation_handler.cache_joined_hosts_for_event(
event, context
)

return context

Expand Down
34 changes: 30 additions & 4 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder, json_encoder
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client

Expand Down Expand Up @@ -447,6 +448,19 @@ def __init__(self, hs: "HomeServer"):

self._external_cache = hs.get_external_cache()

# Stores the state groups we've recently added to the joined hosts
# external cache. Note that the timeout must be significantly less than
# the TTL on the external cache.
self._external_cache_joined_hosts_updates = (
None
) # type: Optional[ExpiringCache]
if self._external_cache.is_enabled():
self._external_cache_joined_hosts_updates = ExpiringCache(
"_external_cache_joined_hosts_updates",
self.clock,
expiry_ms=30 * 60 * 1000,
)

async def create_event(
self,
requester: Requester,
Expand Down Expand Up @@ -957,7 +971,7 @@ async def handle_new_client_event(

await self.action_generator.handle_push_actions_for_event(event, context)

await self.cache_joined_hosts_for_event(event)
await self.cache_joined_hosts_for_event(event, context)

try:
# If we're a worker we need to hit out to the master.
Expand Down Expand Up @@ -998,31 +1012,41 @@ async def handle_new_client_event(
await self.store.remove_push_actions_from_staging(event.event_id)
raise

async def cache_joined_hosts_for_event(self, event: EventBase) -> None:
async def cache_joined_hosts_for_event(
self, event: EventBase, context: EventContext
) -> None:
"""Precalculate the joined hosts at the event, when using Redis, so that
external federation senders don't have to recalculate it themselves.
"""

if not self._external_cache.is_enabled():
return

# If external cache is enabled we should always have this.
assert self._external_cache_joined_hosts_updates is not None

# We actually store two mappings, event ID -> prev state group,
# state group -> joined hosts, which is much more space efficient
# than event ID -> joined hosts.
#
# Note: We have to cache event ID -> prev state group, as we don't
# store that in the DB.
#
# Note: We always set the state group -> joined hosts cache, even if
# we already set it, so that the expiry time is reset.
# Note: We set the state group -> joined hosts cache if it hasn't been
# set for a while, so that the expiry time is reset.

state_entry = await self.state.resolve_state_groups_for_events(
event.room_id, event_ids=event.prev_event_ids()
)

if state_entry.state_group:
if state_entry.state_group in self._external_cache_joined_hosts_updates:
return

joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)

# Note that the expiry times must be larger than the expiry time in
# _external_cache_joined_hosts_updates.
await self._external_cache.set(
"event_to_prev_state_group",
event.event_id,
Expand All @@ -1036,6 +1060,8 @@ async def cache_joined_hosts_for_event(self, event: EventBase) -> None:
expiry_ms=60 * 60 * 1000,
)

self._external_cache_joined_hosts_updates[state_entry.state_group] = None

async def _validate_canonical_alias(
self, directory_handler, room_alias_str: str, expected_room_id: str
) -> None:
Expand Down