From 39d4188a4883512a16f5a2cb38845fe52d939cec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2022 13:48:12 +0000 Subject: [PATCH 1/7] Fix a txn description --- synapse/storage/databases/main/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index bef675b8453c..87a63d99f884 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -772,7 +772,7 @@ async def _get_joined_profiles_from_event_ids(self, event_ids: Iterable[str]): retcols=("user_id", "display_name", "avatar_url", "event_id"), keyvalues={"membership": Membership.JOIN}, batch_size=500, - desc="_get_membership_from_event_ids", + desc="_get_joined_profiles_from_event_ids", ) return { From 998b3b7e6e3d61fa9235734f076326d3a041b76c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2022 15:22:36 +0000 Subject: [PATCH 2/7] Return a proper type in `get_membership_from_event_ids` --- synapse/push/bulk_push_rule_evaluator.py | 28 +++++++++++--------- synapse/storage/databases/main/roommember.py | 19 +++++++++++-- synapse/storage/persist_events.py | 14 +++++++--- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 030898e4d0cc..fbae91defeae 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -24,6 +24,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.state import POWER_KEY +from synapse.storage.databases.main.roommember import EventIdMembership from synapse.util.async_helpers import Linearizer from synapse.util.caches import CacheMetric, register_cache from synapse.util.caches.descriptors import lru_cache @@ -292,7 +293,7 @@ def _condition_checker( return True -MemberMap = Dict[str, Tuple[str, str]] +MemberMap = Dict[str, EventIdMembership] Rule = Dict[str, dict] RulesByUser = Dict[str, List[Rule]] StateGroup = Union[object, int] @@ -306,7 +307,7 @@ class RulesForRoomData: *only* include data, and not references to e.g. the data stores. """ - # event_id -> (user_id, state) + # event_id -> EventIdMembership member_map: MemberMap = attr.Factory(dict) # user_id -> rules rules_by_user: RulesByUser = attr.Factory(dict) @@ -447,11 +448,10 @@ async def get_rules( res = self.data.member_map.get(event_id, None) if res: - user_id, state = res - if state == Membership.JOIN: - rules = self.data.rules_by_user.get(user_id, None) + if res.membership == Membership.JOIN: + rules = self.data.rules_by_user.get(res.user_id, None) if rules: - ret_rules_by_user[user_id] = rules + ret_rules_by_user[res.user_id] = rules continue # If a user has left a room we remove their push rule. If they @@ -502,24 +502,26 @@ async def _update_rules_with_member_event_ids( """ sequence = self.data.sequence - rows = await self.store.get_membership_from_event_ids(member_event_ids.values()) - - members = {row["event_id"]: (row["user_id"], row["membership"]) for row in rows} + members = await self.store.get_membership_from_event_ids( + member_event_ids.values() + ) # If the event is a join event then it will be in current state evnts # map but not in the DB, so we have to explicitly insert it. if event.type == EventTypes.Member: for event_id in member_event_ids.values(): if event_id == event.event_id: - members[event_id] = (event.state_key, event.membership) + members[event_id] = EventIdMembership( + user_id=event.state_key, membership=event.membership + ) if logger.isEnabledFor(logging.DEBUG): logger.debug("Found members %r: %r", self.room_id, members.values()) joined_user_ids = { - user_id - for user_id, membership in members.values() - if membership == Membership.JOIN + entry.user_id + for entry in members.values() + if entry.membership == Membership.JOIN } logger.debug("Joined: %r", joined_user_ids) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 87a63d99f884..d08bf0698e69 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -63,6 +63,14 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership" +@attr.s(frozen=True, slots=True, auto_attribs=True) +class EventIdMembership: + """Returned by `get_membership_from_event_ids`""" + + user_id: str + membership: str + + class RoomMemberWorkerStore(EventsWorkerStore): def __init__( self, @@ -1002,10 +1010,10 @@ async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]: async def get_membership_from_event_ids( self, member_event_ids: Iterable[str] - ) -> List[dict]: + ) -> Dict[str, EventIdMembership]: """Get user_id and membership of a set of event IDs.""" - return await self.db_pool.simple_select_many_batch( + rows = await self.db_pool.simple_select_many_batch( table="room_memberships", column="event_id", iterable=member_event_ids, @@ -1015,6 +1023,13 @@ async def get_membership_from_event_ids( desc="get_membership_from_event_ids", ) + return { + row["event_id"]: EventIdMembership( + membership=row["memebrship"], user_id=row["user_id"] + ) + for row in rows + } + async def is_local_host_in_room_ignoring_users( self, room_id: str, ignore_users: Collection[str] ) -> bool: diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 7d543fdbe08a..0bfd9879cd8a 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -1023,8 +1023,12 @@ async def _is_server_still_joined( # Check if any of the changes that we don't have events for are joins. if events_to_check: - rows = await self.main_store.get_membership_from_event_ids(events_to_check) - is_still_joined = any(row["membership"] == Membership.JOIN for row in rows) + members = await self.main_store.get_membership_from_event_ids( + events_to_check + ) + is_still_joined = any( + member.membership == Membership.JOIN for member in members.values() + ) if is_still_joined: return True @@ -1060,9 +1064,11 @@ async def _is_server_still_joined( ), event_id in current_state.items() if typ == EventTypes.Member and not self.is_mine_id(state_key) ] - rows = await self.main_store.get_membership_from_event_ids(remote_event_ids) + members = await self.main_store.get_membership_from_event_ids(remote_event_ids) potentially_left_users.update( - row["user_id"] for row in rows if row["membership"] == Membership.JOIN + member.user_id + for member in members.values() + if member.membership == Membership.JOIN ) return False From 992599389a938970038b5fd57f2f761ead001e86 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2022 15:30:33 +0000 Subject: [PATCH 3/7] Add cache to `get_membership_from_event_ids`. --- synapse/storage/databases/main/roommember.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index d08bf0698e69..6f13eff40ed5 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1008,6 +1008,15 @@ async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]: return set(room_ids) + @cached(max_entries=5000) + async def _get_membership_from_event_id( + self, member_event_id: str + ) -> EventIdMembership: + raise NotImplementedError() + + @cachedList( + cached_method_name="_get_membership_from_event_id", list_name="member_event_ids" + ) async def get_membership_from_event_ids( self, member_event_ids: Iterable[str] ) -> Dict[str, EventIdMembership]: @@ -1025,7 +1034,7 @@ async def get_membership_from_event_ids( return { row["event_id"]: EventIdMembership( - membership=row["memebrship"], user_id=row["user_id"] + membership=row["membership"], user_id=row["user_id"] ) for row in rows } From cd42fd29f192aa610448bd433e6638eca7899936 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2022 11:27:12 +0000 Subject: [PATCH 4/7] Newsfile --- changelog.d/12272.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12272.misc diff --git a/changelog.d/12272.misc b/changelog.d/12272.misc new file mode 100644 index 000000000000..95589f3361e0 --- /dev/null +++ b/changelog.d/12272.misc @@ -0,0 +1 @@ +Add a new cache `_get_membership_from_event_id` to speed up push rule calculations in large rooms. From 11d2a9d05dc5687b9dbd14e8b9cacded4cbe114f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2022 12:16:17 +0000 Subject: [PATCH 5/7] cachedList will return None values --- synapse/push/bulk_push_rule_evaluator.py | 4 ++-- synapse/storage/databases/main/roommember.py | 11 ++++++++--- synapse/storage/persist_events.py | 5 +++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index fbae91defeae..ddc64ad6a3e4 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -293,7 +293,7 @@ def _condition_checker( return True -MemberMap = Dict[str, EventIdMembership] +MemberMap = Dict[str, Optional[EventIdMembership]] Rule = Dict[str, dict] RulesByUser = Dict[str, List[Rule]] StateGroup = Union[object, int] @@ -521,7 +521,7 @@ async def _update_rules_with_member_event_ids( joined_user_ids = { entry.user_id for entry in members.values() - if entry.membership == Membership.JOIN + if entry and entry.membership == Membership.JOIN } logger.debug("Joined: %r", joined_user_ids) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 6f13eff40ed5..3248da5356a4 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1011,7 +1011,7 @@ async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]: @cached(max_entries=5000) async def _get_membership_from_event_id( self, member_event_id: str - ) -> EventIdMembership: + ) -> Optional[EventIdMembership]: raise NotImplementedError() @cachedList( @@ -1019,8 +1019,13 @@ async def _get_membership_from_event_id( ) async def get_membership_from_event_ids( self, member_event_ids: Iterable[str] - ) -> Dict[str, EventIdMembership]: - """Get user_id and membership of a set of event IDs.""" + ) -> Dict[str, Optional[EventIdMembership]]: + """Get user_id and membership of a set of event IDs. + + Returns: + Mapping from event ID to `EventIdMembership` if the event is a + membership event, otherwise the value is None. + """ rows = await self.db_pool.simple_select_many_batch( table="room_memberships", diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 0bfd9879cd8a..b40292281767 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -1027,7 +1027,8 @@ async def _is_server_still_joined( events_to_check ) is_still_joined = any( - member.membership == Membership.JOIN for member in members.values() + member and member.membership == Membership.JOIN + for member in members.values() ) if is_still_joined: return True @@ -1068,7 +1069,7 @@ async def _is_server_still_joined( potentially_left_users.update( member.user_id for member in members.values() - if member.membership == Membership.JOIN + if member and member.membership == Membership.JOIN ) return False From 3f5992fbd64c785c387e184759653ff252a417c3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 24 Mar 2022 14:36:51 +0000 Subject: [PATCH 6/7] Invalidate the cache --- synapse/push/bulk_push_rule_evaluator.py | 2 +- synapse/storage/databases/main/cache.py | 4 ++++ synapse/storage/databases/main/events.py | 7 +++++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index ddc64ad6a3e4..a402a3e40374 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -506,7 +506,7 @@ async def _update_rules_with_member_event_ids( member_event_ids.values() ) - # If the event is a join event then it will be in current state evnts + # If the event is a join event then it will be in current state events # map but not in the DB, so we have to explicitly insert it. if event.type == EventTypes.Member: for event_id in member_event_ids.values(): diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 2d7511d61391..dd4e83a2ad19 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -192,6 +192,10 @@ def _invalidate_caches_for_event( self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,)) + # The `_get_membership_from_event_id` is immutable, except for the + # case where we look up an event *before* persisting it. + self._get_membership_from_event_id.invalidate((event_id,)) + if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1f60aef180d0..aacbe65c5059 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1745,6 +1745,13 @@ def non_null_str_or_none(val: Any) -> Optional[str]: (event.state_key,), ) + # The `_get_joined_profile_from_event_id` is immutable, except for the + # case where we look up an event *before* persisting it. + txn.call_after( + self.store._get_membership_from_event_id.invalidate, + (event.event_id,), + ) + # We update the local_current_membership table only if the event is # "current", i.e., its something that has just happened. # From d901960bcab7553a04f7b691f978139aa86dc721 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Mar 2022 14:02:48 +0000 Subject: [PATCH 7/7] Update synapse/storage/databases/main/events.py Co-authored-by: reivilibre --- synapse/storage/databases/main/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index aacbe65c5059..d253243125fb 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1745,7 +1745,7 @@ def non_null_str_or_none(val: Any) -> Optional[str]: (event.state_key,), ) - # The `_get_joined_profile_from_event_id` is immutable, except for the + # The `_get_membership_from_event_id` is immutable, except for the # case where we look up an event *before* persisting it. txn.call_after( self.store._get_membership_from_event_id.invalidate,