From a732796149a5913949ef68bef9c55e83b4ea1d87 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Sep 2022 13:34:46 -0500 Subject: [PATCH 1/3] Scratch changes for fix have_seen_event not being invalidated See https://github.com/matrix-org/synapse/issues/13856 --- .../conf/workers-shared-extra.yaml.j2 | 4 +- synapse/handlers/message.py | 1 + synapse/storage/databases/main/cache.py | 2 + synapse/util/ratelimitutils.py | 3 ++ .../databases/main/test_events_worker.py | 47 +++++++++++++++++++ tests/test_utils/event_injection.py | 3 +- 6 files changed, 58 insertions(+), 2 deletions(-) diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index 9e554a865ee5..3f5bbf12f83b 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -31,7 +31,9 @@ federation_ip_range_blacklist: [] # Disable server rate-limiting rc_federation: window_size: 1000 - sleep_limit: 10 + # foo: We run into the rate limiter hard with the MSC2716 tests. + # We go from 35s /messages requests to 20s just by making `/state_ids` and `/state` go faster + sleep_limit: 99999 sleep_delay: 500 reject_limit: 99999 concurrent: 3 diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 10b5dad03009..e391338406b9 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1078,6 +1078,7 @@ async def create_new_client_event( else: prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id) + logger.info("allow_no_prev_events=%s", allow_no_prev_events) # Do a quick sanity check here, rather than waiting until we've created the # event and then try to auth it (which fails with a somewhat confusing "No # create event in auth events") diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 12e9a423826a..aabf3dbba035 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -54,6 +54,7 @@ def __init__( db_conn: LoggingDatabaseConnection, hs: "HomeServer", ): + logger.info("CacheInvalidationWorkerStore constructor") super().__init__(database, db_conn, hs) self._instance_name = hs.get_instance_name() @@ -222,6 +223,7 @@ def _invalidate_caches_for_event( # This invalidates any local in-memory cached event objects, the original # process triggering the invalidation is responsible for clearing any external # cached objects. + logger.info("_invalidate_caches_for_event event_id=%s", event_id) self._invalidate_local_get_event_cache(event_id) self.have_seen_event.invalidate((room_id, event_id)) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 0154f92107e7..ac9b7cb6c038 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -216,6 +216,9 @@ def __init__( self.reject_limit = config.reject_limit self.concurrent_requests = config.concurrent + logger.info("self.sleep_limit=%s", self.sleep_limit) + logger.info("self.reject_limit=%s", self.reject_limit) + # request_id objects for requests which have been slept self.sleeping_requests: Set[object] = set() diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 67401272ac37..6602dffea092 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -20,6 +20,9 @@ from twisted.internet.defer import CancelledError, Deferred, ensureDeferred from twisted.test.proto_helpers import MemoryReactor +import synapse.rest.admin +import synapse.rest.client.login +import synapse.rest.client.room from synapse.api.room_versions import EventFormatVersions, RoomVersions from synapse.events import make_event_from_dict from synapse.logging.context import LoggingContext @@ -33,12 +36,20 @@ from synapse.storage.types import Connection from synapse.util import Clock from synapse.util.async_helpers import yieldable_gather_results +from tests.test_utils.event_injection import create_event from tests import unittest class HaveSeenEventsTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + synapse.rest.client.login.register_servlets, + synapse.rest.client.room.register_servlets, + ] + def prepare(self, reactor, clock, hs): + self.hs = hs self.store: EventsWorkerStore = hs.get_datastores().main # insert some test data @@ -121,6 +132,42 @@ def test_query_via_event_cache(self): self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) + def test_persisting_event_invalidates_cache(self): + with LoggingContext(name="test") as ctx: + alice = self.register_user("alice", "pass") + alice_token = self.login("alice", "pass") + room_id = self.helper.create_room_as(alice, tok=alice_token) + + event, event_context = self.get_success( + create_event( + self.hs, + room_id=room_id, + room_version="6", + sender=alice, + type="test_event_type", + content={"body": "foobarbaz"}, + ) + ) + + # Check first `have_seen_events` for an event we have not seen yet + # to prime the cache with a `false`. + res = self.get_success( + self.store.have_seen_events(event.room_id, [event.event_id]) + ) + self.assertEqual(res, set()) + + # that should result in a single db query to lookup if we have the + # event that we have not persisted yet. + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + + persistence = self.hs.get_storage_controllers().persistence + self.get_success( + persistence.persist_event( + event, + event_context, + ) + ) + class EventCacheTestCase(unittest.HomeserverTestCase): """Test that the various layers of event cache works.""" diff --git a/tests/test_utils/event_injection.py b/tests/test_utils/event_injection.py index 8027c7a856e2..978dffcda2ff 100644 --- a/tests/test_utils/event_injection.py +++ b/tests/test_utils/event_injection.py @@ -93,7 +93,8 @@ async def create_event( KNOWN_ROOM_VERSIONS[room_version], kwargs ) event, context = await hs.get_event_creation_handler().create_new_client_event( - builder, prev_event_ids=prev_event_ids + builder, + prev_event_ids=prev_event_ids, ) return event, context From dd4be2453f59926af005598b2d168654d13cadd0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Sep 2022 15:45:08 -0500 Subject: [PATCH 2/3] Fix have_seen_event cache not being invalidated when we persist the event Fix for https://github.com/matrix-org/synapse/issues/13856 Fixed by calling `_invalidate_caches_for_event` when we persist an event. And an additional fix in `_invalidate_caches_for_event` to make sure it uses the correct cache key. This seems like it would be an easy foot-gun for any `tree=True` cache. Wrong: ```py self.have_seen_event.invalidate((room_id, event_id)) ``` Correct: ```py self.have_seen_event.invalidate(((room_id, event_id),)) ``` --- synapse/handlers/message.py | 1 - synapse/storage/databases/main/cache.py | 11 +- synapse/storage/databases/main/events.py | 18 +++ .../storage/databases/main/events_worker.py | 4 +- synapse/util/caches/deferred_cache.py | 6 + synapse/util/caches/lrucache.py | 6 + .../databases/main/test_events_worker.py | 135 ++++++++---------- 7 files changed, 98 insertions(+), 83 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e391338406b9..10b5dad03009 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1078,7 +1078,6 @@ async def create_new_client_event( else: prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id) - logger.info("allow_no_prev_events=%s", allow_no_prev_events) # Do a quick sanity check here, rather than waiting until we've created the # event and then try to auth it (which fails with a somewhat confusing "No # create event in auth events") diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index aabf3dbba035..53646b978a6e 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -223,9 +223,16 @@ def _invalidate_caches_for_event( # This invalidates any local in-memory cached event objects, the original # process triggering the invalidation is responsible for clearing any external # cached objects. - logger.info("_invalidate_caches_for_event event_id=%s", event_id) + logger.info( + "CacheInvalidationWorkerStore _invalidate_caches_for_event room_id=%s event_id=%s", + room_id, + event_id, + ) + logger.info( + "CacheInvalidationWorkerStore self.have_seen_event=%s", self.have_seen_event + ) self._invalidate_local_get_event_cache(event_id) - self.have_seen_event.invalidate((room_id, event_id)) + self.have_seen_event.invalidate(((room_id, event_id),)) self.get_latest_event_ids_in_room.invalidate((room_id,)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 5932668f2fbe..368e9b47e978 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -434,6 +434,24 @@ def _persist_events_txn( self._store_event_txn(txn, events_and_contexts=events_and_contexts) + for event, _ in events_and_contexts: + # We expect events to be persisted by this point + assert event.internal_metadata.stream_ordering + + relation = relation_from_event(event) + self.store._invalidate_caches_for_event( + stream_ordering=event.internal_metadata.stream_ordering, + event_id=event.event_id, + room_id=event.room_id, + etype=event.type, + state_key=None, # event.state_key, + # TODO + redacts=None, + relates_to=relation.parent_id if relation else None, + # TODO + backfilled=False, + ) + self._persist_transaction_ids_txn(txn, events_and_contexts) # Insert into event_to_state_groups. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 9f6b1fcef1ce..debe8e5f3f6e 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1453,7 +1453,7 @@ async def have_events_in_timeline(self, event_ids: Iterable[str]) -> Set[str]: @trace @tag_args async def have_seen_events( - self, room_id: str, event_ids: Iterable[str] + self, room_id: str, event_ids: Collection[str] ) -> Set[str]: """Given a list of event ids, check if we have already processed them. @@ -1468,6 +1468,7 @@ async def have_seen_events( Returns: The set of events we have already seen. """ + logger.info("have_seen_events room_id=%s event_ids=%s", room_id, event_ids) # @cachedList chomps lots of memory if you call it with a big list, so # we break it down. However, each batch requires its own index scan, so we make @@ -1491,6 +1492,7 @@ async def _have_seen_events_dict( Returns: a dict {(room_id, event_id)-> bool} """ + logger.info("_have_seen_events_dict keys=%s", keys) # if the event cache contains the event, obviously we've seen it. cache_results = { diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 6425f851eaa4..36b05fc344dc 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -383,8 +383,14 @@ def invalidate(self, key: KT) -> None: may be of lower cardinality than the TreeCache - in which case the whole subtree is deleted. """ + import logging + + logger = logging.getLogger(__name__) + logger.info("DeferredCache before=%s", self.cache.len()) + logger.info("DeferredCache invalidate key=%s", key) self.check_thread() self.cache.del_multi(key) + logger.info("DeferredCache after=%s", self.cache.len()) # if we have a pending lookup for this key, remove it from the # _pending_deferred_cache, which will (a) stop it being returned for diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index aa93109d1380..5a745eb8c5b6 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -511,6 +511,7 @@ def add_node( callbacks, prune_unread_entries, ) + logger.info("LruCache add_node key=%s value=%s", key, value) cache[key] = node if size_callback: @@ -722,7 +723,12 @@ def cache_del_multi(key: KT) -> None: may be of lower cardinality than the TreeCache - in which case the whole subtree is deleted. """ + logger.info( + "LruCache cache values before pop %s", + {node.key: node.value for node in cache.values()}, + ) popped = cache.pop(key, None) + logger.info("LruCache cache_del_multi key=%s popped=%s", key, popped) if popped is None: return # for each deleted node, we now need to remove it from the linked list diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 6602dffea092..158ad1f4393e 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -20,9 +20,6 @@ from twisted.internet.defer import CancelledError, Deferred, ensureDeferred from twisted.test.proto_helpers import MemoryReactor -import synapse.rest.admin -import synapse.rest.client.login -import synapse.rest.client.room from synapse.api.room_versions import EventFormatVersions, RoomVersions from synapse.events import make_event_from_dict from synapse.logging.context import LoggingContext @@ -36,76 +33,47 @@ from synapse.storage.types import Connection from synapse.util import Clock from synapse.util.async_helpers import yieldable_gather_results -from tests.test_utils.event_injection import create_event from tests import unittest +from tests.test_utils.event_injection import create_event, inject_event class HaveSeenEventsTestCase(unittest.HomeserverTestCase): servlets = [ - synapse.rest.admin.register_servlets, - synapse.rest.client.login.register_servlets, - synapse.rest.client.room.register_servlets, + admin.register_servlets, + room.register_servlets, + login.register_servlets, ] def prepare(self, reactor, clock, hs): self.hs = hs self.store: EventsWorkerStore = hs.get_datastores().main - # insert some test data - for rid in ("room1", "room2"): - self.get_success( - self.store.db_pool.simple_insert( - "rooms", - {"room_id": rid, "room_version": 4}, - ) - ) + self.user = self.register_user("user", "pass") + self.token = self.login(self.user, "pass") + self.room_id = self.helper.create_room_as(self.user, tok=self.token) self.event_ids: List[str] = [] - for idx, rid in enumerate( - ( - "room1", - "room1", - "room1", - "room2", - ) - ): - event_json = {"type": f"test {idx}", "room_id": rid} - event = make_event_from_dict(event_json, room_version=RoomVersions.V4) - event_id = event.event_id - - self.get_success( - self.store.db_pool.simple_insert( - "events", - { - "event_id": event_id, - "room_id": rid, - "topological_ordering": idx, - "stream_ordering": idx, - "type": event.type, - "processed": True, - "outlier": False, - }, - ) - ) - self.get_success( - self.store.db_pool.simple_insert( - "event_json", - { - "event_id": event_id, - "room_id": rid, - "json": json.dumps(event_json), - "internal_metadata": "{}", - "format_version": 3, - }, + for i in range(3): + event = self.get_success( + inject_event( + hs, + room_version=RoomVersions.V7.identifier, + room_id=self.room_id, + sender=self.user, + type="test_event_type", + content={"body": f"foobarbaz{i}"}, ) ) - self.event_ids.append(event_id) + + self.event_ids.append(event.event_id) def test_simple(self): with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + self.store.have_seen_events( + self.room_id, [self.event_ids[0], "eventdoesnotexist"] + ) ) self.assertEqual(res, {self.event_ids[0]}) @@ -115,7 +83,9 @@ def test_simple(self): # a second lookup of the same events should cause no queries with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + self.store.have_seen_events( + self.room_id, [self.event_ids[0], "eventdoesnotexist"] + ) ) self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) @@ -127,46 +97,53 @@ def test_query_via_event_cache(self): # looking it up should now cause no db hits with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0]]) + self.store.have_seen_events(self.room_id, [self.event_ids[0]]) ) self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) def test_persisting_event_invalidates_cache(self): - with LoggingContext(name="test") as ctx: - alice = self.register_user("alice", "pass") - alice_token = self.login("alice", "pass") - room_id = self.helper.create_room_as(alice, tok=alice_token) - - event, event_context = self.get_success( - create_event( - self.hs, - room_id=room_id, - room_version="6", - sender=alice, - type="test_event_type", - content={"body": "foobarbaz"}, - ) + event, event_context = self.get_success( + create_event( + self.hs, + room_id=self.room_id, + sender=self.user, + type="test_event_type", + content={"body": "garply"}, ) + ) - # Check first `have_seen_events` for an event we have not seen yet - # to prime the cache with a `false`. + with LoggingContext(name="test") as ctx: + # First, check `have_seen_event` for an event we have not seen yet + # to prime the cache with a `false` value. res = self.get_success( self.store.have_seen_events(event.room_id, [event.event_id]) ) self.assertEqual(res, set()) - # that should result in a single db query to lookup if we have the - # event that we have not persisted yet. + # That should result in a single db query to lookup self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) - persistence = self.hs.get_storage_controllers().persistence - self.get_success( - persistence.persist_event( - event, - event_context, - ) + # Persist the event which should invalidate or prefill the + # `have_seen_event` cache so we don't return stale values. + persistence = self.hs.get_storage_controllers().persistence + self.get_success( + persistence.persist_event( + event, + event_context, ) + ) + + with LoggingContext(name="test") as ctx: + # Check `have_seen_event` again and we should see the updated fact + # that we have now seen the event after persisting it. + res = self.get_success( + self.store.have_seen_events(event.room_id, [event.event_id]) + ) + self.assertEqual(res, {event.event_id}) + + # That should result in a single db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) class EventCacheTestCase(unittest.HomeserverTestCase): From 24905b78f57c330c8e7bf53633326a311b945196 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Sep 2022 17:55:45 -0500 Subject: [PATCH 3/3] Move cache invalidation to the main thread See https://github.com/matrix-org/synapse/pull/13861#discussion_r976982283 --- synapse/storage/controllers/persist_events.py | 32 ++++++++++++++++++- synapse/storage/databases/main/cache.py | 3 -- synapse/storage/databases/main/events.py | 18 ----------- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index bde7a6648ae7..ffe14283a315 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -43,7 +43,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.events import EventBase +from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.tracing import ( @@ -435,6 +435,21 @@ async def enqueue( else: events.append(event) + # We expect events to be persisted by this point + assert event.internal_metadata.stream_ordering + # Invalidate related caches after we persist a new event + relation = relation_from_event(event) + self.main_store._invalidate_caches_for_event( + stream_ordering=event.internal_metadata.stream_ordering, + event_id=event.event_id, + room_id=event.room_id, + etype=event.type, + state_key=event.state_key if hasattr(event, "state_key") else None, + redacts=event.redacts, + relates_to=relation.parent_id if relation else None, + backfilled=backfilled, + ) + return ( events, self.main_store.get_room_max_token(), @@ -467,6 +482,21 @@ async def persist_event( replaced_event = replaced_events.get(event.event_id) if replaced_event: event = await self.main_store.get_event(replaced_event) + else: + # We expect events to be persisted by this point + assert event.internal_metadata.stream_ordering + # Invalidate related caches after we persist a new event + relation = relation_from_event(event) + self.main_store._invalidate_caches_for_event( + stream_ordering=event.internal_metadata.stream_ordering, + event_id=event.event_id, + room_id=event.room_id, + etype=event.type, + state_key=event.state_key if hasattr(event, "state_key") else None, + redacts=event.redacts, + relates_to=relation.parent_id if relation else None, + backfilled=backfilled, + ) event_stream_id = event.internal_metadata.stream_ordering # stream ordering should have been assigned by now diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 53646b978a6e..b28a91e8f638 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -228,9 +228,6 @@ def _invalidate_caches_for_event( room_id, event_id, ) - logger.info( - "CacheInvalidationWorkerStore self.have_seen_event=%s", self.have_seen_event - ) self._invalidate_local_get_event_cache(event_id) self.have_seen_event.invalidate(((room_id, event_id),)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 368e9b47e978..5932668f2fbe 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -434,24 +434,6 @@ def _persist_events_txn( self._store_event_txn(txn, events_and_contexts=events_and_contexts) - for event, _ in events_and_contexts: - # We expect events to be persisted by this point - assert event.internal_metadata.stream_ordering - - relation = relation_from_event(event) - self.store._invalidate_caches_for_event( - stream_ordering=event.internal_metadata.stream_ordering, - event_id=event.event_id, - room_id=event.room_id, - etype=event.type, - state_key=None, # event.state_key, - # TODO - redacts=None, - relates_to=relation.parent_id if relation else None, - # TODO - backfilled=False, - ) - self._persist_transaction_ids_txn(txn, events_and_contexts) # Insert into event_to_state_groups.