Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Scratch changes to fix have_seen_event not being invalidated #13861

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ federation_ip_range_blacklist: []
# Disable server rate-limiting
rc_federation:
window_size: 1000
sleep_limit: 10
# foo: We run into the rate limiter hard with the MSC2716 tests.
# We go from 35s /messages requests to 20s just by making `/state_ids` and `/state` go faster
sleep_limit: 99999
sleep_delay: 500
reject_limit: 99999
concurrent: 3
Expand Down
32 changes: 31 additions & 1 deletion synapse/storage/controllers/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.tracing import (
Expand Down Expand Up @@ -435,6 +435,21 @@ async def enqueue(
else:
events.append(event)

# We expect events to be persisted by this point
assert event.internal_metadata.stream_ordering
# Invalidate related caches after we persist a new event
relation = relation_from_event(event)
self.main_store._invalidate_caches_for_event(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split out the fixes to #13863

stream_ordering=event.internal_metadata.stream_ordering,
event_id=event.event_id,
room_id=event.room_id,
etype=event.type,
state_key=event.state_key if hasattr(event, "state_key") else None,
redacts=event.redacts,
relates_to=relation.parent_id if relation else None,
backfilled=backfilled,
)

return (
events,
self.main_store.get_room_max_token(),
Expand Down Expand Up @@ -467,6 +482,21 @@ async def persist_event(
replaced_event = replaced_events.get(event.event_id)
if replaced_event:
event = await self.main_store.get_event(replaced_event)
else:
# We expect events to be persisted by this point
assert event.internal_metadata.stream_ordering
# Invalidate related caches after we persist a new event
relation = relation_from_event(event)
self.main_store._invalidate_caches_for_event(
stream_ordering=event.internal_metadata.stream_ordering,
event_id=event.event_id,
room_id=event.room_id,
etype=event.type,
state_key=event.state_key if hasattr(event, "state_key") else None,
redacts=event.redacts,
relates_to=relation.parent_id if relation else None,
backfilled=backfilled,
)

event_stream_id = event.internal_metadata.stream_ordering
# stream ordering should have been assigned by now
Expand Down
8 changes: 7 additions & 1 deletion synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
logger.info("CacheInvalidationWorkerStore constructor")
super().__init__(database, db_conn, hs)

self._instance_name = hs.get_instance_name()
Expand Down Expand Up @@ -222,8 +223,13 @@ def _invalidate_caches_for_event(
# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
logger.info(
"CacheInvalidationWorkerStore _invalidate_caches_for_event room_id=%s event_id=%s",
room_id,
event_id,
)
self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))
self.have_seen_event.invalidate(((room_id, event_id),))
Copy link
Contributor Author

@MadLittleMods MadLittleMods Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This key lookup was wrong and we were never invalidating the have_seen_event cache even over replication.


Yes, the cache key literally a set wrapped in set. Something weird with the TreeCache I assume

ex. (('!TnCIJPKzdQdUlIyXdQ:test', '$Iu0eqEBN7qcyF1S9B3oNB3I91v2o5YOgRNPwi_78s-k'),)

LruCache cache values before pop {(('!TnCIJPKzdQdUlIyXdQ:test', '$Iu0eqEBN7qcyF1S9B3oNB3I91v2o5YOgRNPwi_78s-k'),): False}

We should probably check all other instances of this for the same problem. And ideally fix the cache so it uses the expected (room_id, event_id) key tuple instead.


self.get_latest_event_ids_in_room.invalidate((room_id,))

Expand Down
4 changes: 3 additions & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,7 @@ async def have_events_in_timeline(self, event_ids: Iterable[str]) -> Set[str]:
@trace
@tag_args
async def have_seen_events(
self, room_id: str, event_ids: Iterable[str]
self, room_id: str, event_ids: Collection[str]
) -> Set[str]:
"""Given a list of event ids, check if we have already processed them.

Expand All @@ -1468,6 +1468,7 @@ async def have_seen_events(
Returns:
The set of events we have already seen.
"""
logger.info("have_seen_events room_id=%s event_ids=%s", room_id, event_ids)

# @cachedList chomps lots of memory if you call it with a big list, so
# we break it down. However, each batch requires its own index scan, so we make
Expand All @@ -1491,6 +1492,7 @@ async def _have_seen_events_dict(
Returns:
a dict {(room_id, event_id)-> bool}
"""
logger.info("_have_seen_events_dict keys=%s", keys)
# if the event cache contains the event, obviously we've seen it.

cache_results = {
Expand Down
6 changes: 6 additions & 0 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,14 @@ def invalidate(self, key: KT) -> None:
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
import logging

logger = logging.getLogger(__name__)
logger.info("DeferredCache before=%s", self.cache.len())
logger.info("DeferredCache invalidate key=%s", key)
self.check_thread()
self.cache.del_multi(key)
logger.info("DeferredCache after=%s", self.cache.len())

# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, which will (a) stop it being returned for
Expand Down
6 changes: 6 additions & 0 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ def add_node(
callbacks,
prune_unread_entries,
)
logger.info("LruCache add_node key=%s value=%s", key, value)
cache[key] = node

if size_callback:
Expand Down Expand Up @@ -722,7 +723,12 @@ def cache_del_multi(key: KT) -> None:
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
logger.info(
"LruCache cache values before pop %s",
{node.key: node.value for node in cache.values()},
)
popped = cache.pop(key, None)
logger.info("LruCache cache_del_multi key=%s popped=%s", key, popped)
if popped is None:
return
# for each deleted node, we now need to remove it from the linked list
Expand Down
3 changes: 3 additions & 0 deletions synapse/util/ratelimitutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ def __init__(
self.reject_limit = config.reject_limit
self.concurrent_requests = config.concurrent

logger.info("self.sleep_limit=%s", self.sleep_limit)
logger.info("self.reject_limit=%s", self.reject_limit)

# request_id objects for requests which have been slept
self.sleeping_requests: Set[object] = set()

Expand Down
120 changes: 72 additions & 48 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,66 +35,45 @@
from synapse.util.async_helpers import yieldable_gather_results

from tests import unittest
from tests.test_utils.event_injection import create_event, inject_event


class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def prepare(self, reactor, clock, hs):
self.hs = hs
self.store: EventsWorkerStore = hs.get_datastores().main

# insert some test data
for rid in ("room1", "room2"):
self.get_success(
self.store.db_pool.simple_insert(
"rooms",
{"room_id": rid, "room_version": 4},
)
)
self.user = self.register_user("user", "pass")
self.token = self.login(self.user, "pass")
self.room_id = self.helper.create_room_as(self.user, tok=self.token)

self.event_ids: List[str] = []
for idx, rid in enumerate(
(
"room1",
"room1",
"room1",
"room2",
)
):
event_json = {"type": f"test {idx}", "room_id": rid}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id

self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": event_id,
"room_id": rid,
"topological_ordering": idx,
"stream_ordering": idx,
"type": event.type,
"processed": True,
"outlier": False,
},
)
)
self.get_success(
self.store.db_pool.simple_insert(
"event_json",
{
"event_id": event_id,
"room_id": rid,
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": 3,
},
for i in range(3):
event = self.get_success(
inject_event(
hs,
room_version=RoomVersions.V7.identifier,
room_id=self.room_id,
sender=self.user,
type="test_event_type",
content={"body": f"foobarbaz{i}"},
)
)
self.event_ids.append(event_id)

self.event_ids.append(event.event_id)

def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
self.store.have_seen_events(
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
)
)
self.assertEqual(res, {self.event_ids[0]})

Expand All @@ -104,7 +83,9 @@ def test_simple(self):
# a second lookup of the same events should cause no queries
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
self.store.have_seen_events(
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
)
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
Expand All @@ -116,11 +97,54 @@ def test_query_via_event_cache(self):
# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0]])
self.store.have_seen_events(self.room_id, [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)

def test_persisting_event_invalidates_cache(self):
event, event_context = self.get_success(
create_event(
self.hs,
room_id=self.room_id,
sender=self.user,
type="test_event_type",
content={"body": "garply"},
)
)

with LoggingContext(name="test") as ctx:
# First, check `have_seen_event` for an event we have not seen yet
# to prime the cache with a `false` value.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, set())

# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)

# Persist the event which should invalidate or prefill the
# `have_seen_event` cache so we don't return stale values.
persistence = self.hs.get_storage_controllers().persistence
self.get_success(
persistence.persist_event(
event,
event_context,
)
)

with LoggingContext(name="test") as ctx:
# Check `have_seen_event` again and we should see the updated fact
# that we have now seen the event after persisting it.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, {event.event_id})

# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)


class EventCacheTestCase(unittest.HomeserverTestCase):
"""Test that the various layers of event cache works."""
Expand Down
3 changes: 2 additions & 1 deletion tests/test_utils/event_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ async def create_event(
KNOWN_ROOM_VERSIONS[room_version], kwargs
)
event, context = await hs.get_event_creation_handler().create_new_client_event(
builder, prev_event_ids=prev_event_ids
builder,
prev_event_ids=prev_event_ids,
)

return event, context