diff --git a/changelog.d/11114.bugfix b/changelog.d/11114.bugfix new file mode 100644 index 000000000000..c6e65df97f90 --- /dev/null +++ b/changelog.d/11114.bugfix @@ -0,0 +1 @@ +Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a37ae0ca094f..c0f642005ff4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -166,9 +166,14 @@ async def _maybe_backfill_inner( oldest_events_with_depth = ( await self.store.get_oldest_event_ids_with_depth_in_room(room_id) ) - insertion_events_to_be_backfilled = ( - await self.store.get_insertion_event_backwards_extremities_in_room(room_id) - ) + + insertion_events_to_be_backfilled: Dict[str, int] = {} + if self.hs.config.experimental.msc2716_enabled: + insertion_events_to_be_backfilled = ( + await self.store.get_insertion_event_backward_extremities_in_room( + room_id + ) + ) logger.debug( "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s", oldest_events_with_depth, @@ -271,11 +276,12 @@ async def _maybe_backfill_inner( ] logger.debug( - "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s", + "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s", room_id, current_depth, limit, max_depth, + len(sorted_extremeties_tuple), sorted_extremeties_tuple, filtered_sorted_extremeties_tuple, ) @@ -1047,6 +1053,19 @@ async def on_backfill_request( limit = min(limit, 100) events = await self.store.get_backfill_events(room_id, pdu_list, limit) + logger.debug( + "on_backfill_request: backfill events=%s", + [ + "event_id=%s,depth=%d,body=%s,prevs=%s\n" + % ( + event.event_id, + event.depth, + event.content.get("body", event.type), + event.prev_event_ids(), + ) + for event in events + ], + ) events = await filter_events_for_server(self.storage, origin, events) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 3905f60b3a78..9edc7369d6fe 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -508,7 +508,11 @@ async def backfill( f"room {ev.room_id}, when we were backfilling in {room_id}" ) - await self._process_pulled_events(dest, events, backfilled=True) + await self._process_pulled_events( + dest, + events, + backfilled=True, + ) async def _get_missing_events_for_pdu( self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int @@ -626,11 +630,24 @@ async def _process_pulled_events( backfilled: True if this is part of a historical batch of events (inhibits notification to clients, and validation of device keys.) """ + logger.debug( + "processing pulled backfilled=%s events=%s", + backfilled, + [ + "event_id=%s,depth=%d,body=%s,prevs=%s\n" + % ( + event.event_id, + event.depth, + event.content.get("body", event.type), + event.prev_event_ids(), + ) + for event in events + ], + ) # We want to sort these by depth so we process them and # tell clients about them in order. sorted_events = sorted(events, key=lambda x: x.depth) - for ev in sorted_events: with nested_logging_context(ev.event_id): await self._process_pulled_event(origin, ev, backfilled=backfilled) @@ -992,6 +1009,8 @@ async def _process_received_pdu( await self._run_push_actions_and_persist_event(event, context, backfilled) + await self._handle_marker_event(origin, event) + if backfilled or context.rejected: return @@ -1071,8 +1090,6 @@ async def _process_received_pdu( event.sender, ) - await self._handle_marker_event(origin, event) - async def _resync_device(self, sender: str) -> None: """We have detected that the device list for the given user may be out of sync, so we try and resync them. @@ -1323,7 +1340,14 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]: return event, context events_to_persist = (x for x in (prep(event) for event in fetched_events) if x) - await self.persist_events_and_notify(room_id, tuple(events_to_persist)) + await self.persist_events_and_notify( + room_id, + tuple(events_to_persist), + # Mark these events backfilled as they're historic events that will + # eventually be backfilled. For example, missing events we fetch + # during backfill should be marked as backfilled as well. + backfilled=True, + ) async def _check_event_auth( self, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b37250aa3895..9267e586a83d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -490,12 +490,12 @@ async def create_event( requester: Requester, event_dict: dict, txn_id: Optional[str] = None, + allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, require_consent: bool = True, outlier: bool = False, historical: bool = False, - allow_no_prev_events: bool = False, depth: Optional[int] = None, ) -> Tuple[EventBase, EventContext]: """ @@ -510,6 +510,10 @@ async def create_event( requester event_dict: An entire event txn_id + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. prev_event_ids: the forward extremities to use as the prev_events for the new event. @@ -604,10 +608,10 @@ async def create_event( event, context = await self.create_new_client_event( builder=builder, requester=requester, + allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, depth=depth, - allow_no_prev_events=allow_no_prev_events, ) # In an ideal world we wouldn't need the second part of this condition. However, @@ -764,6 +768,7 @@ async def create_and_send_nonmember_event( self, requester: Requester, event_dict: dict, + allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, ratelimit: bool = True, @@ -781,6 +786,10 @@ async def create_and_send_nonmember_event( Args: requester: The requester sending the event. event_dict: An entire event. + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. prev_event_ids: The event IDs to use as the prev events. Should normally be left as None to automatically request them @@ -880,16 +889,20 @@ async def create_new_client_event( self, builder: EventBuilder, requester: Optional[Requester] = None, + allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, depth: Optional[int] = None, - allow_no_prev_events: bool = False, ) -> Tuple[EventBase, EventContext]: """Create a new event for a local client Args: builder: requester: + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. prev_event_ids: the forward extremities to use as the prev_events for the new event. @@ -908,7 +921,6 @@ async def create_new_client_event( Returns: Tuple of created event, context """ - # Strip down the auth_event_ids to only what we need to auth the event. # For example, we don't need extra m.room.member that don't match event.sender full_state_ids_at_event = None diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index f880aa93d29a..f8137ec04cc5 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -13,10 +13,6 @@ logger = logging.getLogger(__name__) -def generate_fake_event_id() -> str: - return "$fake_" + random_string(43) - - class RoomBatchHandler: def __init__(self, hs: "HomeServer"): self.hs = hs @@ -182,11 +178,12 @@ async def persist_state_events_at_start( state_event_ids_at_start = [] auth_event_ids = initial_auth_event_ids.copy() - # Make the state events float off on their own so we don't have a - # bunch of `@mxid joined the room` noise between each batch - prev_event_id_for_state_chain = generate_fake_event_id() + # Make the state events float off on their own by specifying no + # prev_events for the first one in the chain so we don't have a bunch of + # `@mxid joined the room` noise between each batch. + prev_event_ids_for_state_chain: List[str] = [] - for state_event in state_events_at_start: + for index, state_event in enumerate(state_events_at_start): assert_params_in_dict( state_event, ["type", "origin_server_ts", "content", "sender"] ) @@ -222,7 +219,10 @@ async def persist_state_events_at_start( content=event_dict["content"], outlier=True, historical=True, - prev_event_ids=[prev_event_id_for_state_chain], + # Only the first event in the chain should be floating. + # The rest should hang off each other in a chain. + allow_no_prev_events=index == 0, + prev_event_ids=prev_event_ids_for_state_chain, # Make sure to use a copy of this list because we modify it # later in the loop here. Otherwise it will be the same # reference and also update in the event when we append later. @@ -242,7 +242,10 @@ async def persist_state_events_at_start( event_dict, outlier=True, historical=True, - prev_event_ids=[prev_event_id_for_state_chain], + # Only the first event in the chain should be floating. + # The rest should hang off each other in a chain. + allow_no_prev_events=index == 0, + prev_event_ids=prev_event_ids_for_state_chain, # Make sure to use a copy of this list because we modify it # later in the loop here. Otherwise it will be the same # reference and also update in the event when we append later. @@ -253,7 +256,7 @@ async def persist_state_events_at_start( state_event_ids_at_start.append(event_id) auth_event_ids.append(event_id) # Connect all the state in a floating chain - prev_event_id_for_state_chain = event_id + prev_event_ids_for_state_chain = [event_id] return state_event_ids_at_start @@ -261,7 +264,6 @@ async def persist_historical_events( self, events_to_create: List[JsonDict], room_id: str, - initial_prev_event_ids: List[str], inherited_depth: int, auth_event_ids: List[str], app_service_requester: Requester, @@ -277,9 +279,6 @@ async def persist_historical_events( events_to_create: List of historical events to create in JSON dictionary format. room_id: Room where you want the events persisted in. - initial_prev_event_ids: These will be the prev_events for the first - event created. Each event created afterwards will point to the - previous event created. inherited_depth: The depth to create the events at (you will probably by calling inherit_depth_from_prev_ids(...)). auth_event_ids: Define which events allow you to create the given @@ -291,11 +290,14 @@ async def persist_historical_events( """ assert app_service_requester.app_service - prev_event_ids = initial_prev_event_ids.copy() + # Make the historical event chain float off on its own by specifying no + # prev_events for the first event in the chain which causes the HS to + # ask for the state at the start of the batch later. + prev_event_ids: List[str] = [] event_ids = [] events_to_persist = [] - for ev in events_to_create: + for index, ev in enumerate(events_to_create): assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % ( @@ -319,6 +321,9 @@ async def persist_historical_events( ev["sender"], app_service_requester.app_service ), event_dict, + # Only the first event in the chain should be floating. + # The rest should hang off each other in a chain. + allow_no_prev_events=index == 0, prev_event_ids=event_dict.get("prev_events"), auth_event_ids=auth_event_ids, historical=True, @@ -370,7 +375,6 @@ async def handle_batch_of_events( events_to_create: List[JsonDict], room_id: str, batch_id_to_connect_to: str, - initial_prev_event_ids: List[str], inherited_depth: int, auth_event_ids: List[str], app_service_requester: Requester, @@ -385,9 +389,6 @@ async def handle_batch_of_events( room_id: Room where you want the events created in. batch_id_to_connect_to: The batch_id from the insertion event you want this batch to connect to. - initial_prev_event_ids: These will be the prev_events for the first - event created. Each event created afterwards will point to the - previous event created. inherited_depth: The depth to create the events at (you will probably by calling inherit_depth_from_prev_ids(...)). auth_event_ids: Define which events allow you to create the given @@ -436,7 +437,6 @@ async def handle_batch_of_events( event_ids = await self.persist_historical_events( events_to_create=events_to_create, room_id=room_id, - initial_prev_event_ids=initial_prev_event_ids, inherited_depth=inherited_depth, auth_event_ids=auth_event_ids, app_service_requester=app_service_requester, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index efe6b4c9aaf3..bf1a47efb020 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -268,7 +268,8 @@ async def _local_membership_update( target: UserID, room_id: str, membership: str, - prev_event_ids: List[str], + allow_no_prev_events: bool = False, + prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, txn_id: Optional[str] = None, ratelimit: bool = True, @@ -286,8 +287,12 @@ async def _local_membership_update( target: room_id: membership: - prev_event_ids: The event IDs to use as the prev events + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. + prev_event_ids: The event IDs to use as the prev events auth_event_ids: The event ids to use as the auth_events for the new event. Should normally be left as None, which will cause them to be calculated @@ -344,6 +349,7 @@ async def _local_membership_update( "membership": membership, }, txn_id=txn_id, + allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, require_consent=require_consent, @@ -446,6 +452,7 @@ async def update_membership( require_consent: bool = True, outlier: bool = False, historical: bool = False, + allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, ) -> Tuple[str, int]: @@ -470,6 +477,10 @@ async def update_membership( historical: Indicates whether the message is being inserted back in time around some existing events. This is used to skip a few checks and mark the event as backfilled. + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. prev_event_ids: The event IDs to use as the prev events auth_event_ids: The event ids to use as the auth_events for the new event. @@ -504,6 +515,7 @@ async def update_membership( require_consent=require_consent, outlier=outlier, historical=historical, + allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, ) @@ -525,6 +537,7 @@ async def update_membership_locked( require_consent: bool = True, outlier: bool = False, historical: bool = False, + allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, ) -> Tuple[str, int]: @@ -551,6 +564,10 @@ async def update_membership_locked( historical: Indicates whether the message is being inserted back in time around some existing events. This is used to skip a few checks and mark the event as backfilled. + allow_no_prev_events: Whether to allow this event to be created an empty + list of prev_events. Normally this is prohibited just because most + events should have a prev_event and we should only use this in special + cases like MSC2716. prev_event_ids: The event IDs to use as the prev events auth_event_ids: The event ids to use as the auth_events for the new event. @@ -680,6 +697,7 @@ async def update_membership_locked( membership=effective_membership_state, txn_id=txn_id, ratelimit=ratelimit, + allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, content=content, diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index e4c9451ae06a..4b6be38327e0 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -131,6 +131,14 @@ async def on_POST( prev_event_ids_from_query ) + if not auth_event_ids: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "No auth events found for given prev_event query parameter. The prev_event=%s probably does not exist." + % prev_event_ids_from_query, + errcode=Codes.INVALID_PARAM, + ) + state_event_ids_at_start = [] # Create and persist all of the state events that float off on their own # before the batch. These will most likely be all of the invite/member @@ -197,21 +205,12 @@ async def on_POST( EventContentFields.MSC2716_NEXT_BATCH_ID ] - # Also connect the historical event chain to the end of the floating - # state chain, which causes the HS to ask for the state at the start of - # the batch later. If there is no state chain to connect to, just make - # the insertion event float itself. - prev_event_ids = [] - if len(state_event_ids_at_start): - prev_event_ids = [state_event_ids_at_start[-1]] - # Create and persist all of the historical events as well as insertion # and batch meta events to make the batch navigable in the DAG. event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events( events_to_create=events_to_create, room_id=room_id, batch_id_to_connect_to=batch_id_to_connect_to, - initial_prev_event_ids=prev_event_ids, inherited_depth=inherited_depth, auth_event_ids=auth_event_ids, app_service_requester=requester, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ca71f073fcbc..22f64741277a 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -16,9 +16,10 @@ from queue import Empty, PriorityQueue from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple +import attr from prometheus_client import Counter, Gauge -from synapse.api.constants import MAX_DEPTH +from synapse.api.constants import MAX_DEPTH, EventTypes from synapse.api.errors import StoreError from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict @@ -60,6 +61,15 @@ logger = logging.getLogger(__name__) +# All the info we need while iterating the DAG while backfilling +@attr.s(frozen=True, slots=True, auto_attribs=True) +class BackfillQueueNavigationItem: + depth: int + stream_ordering: int + event_id: str + type: str + + class _NoChainCoverIndex(Exception): def __init__(self, room_id: str): super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,)) @@ -74,6 +84,8 @@ def __init__( ): super().__init__(database, db_conn, hs) + self.hs = hs + if hs.config.worker.run_background_tasks: hs.get_clock().looping_call( self._delete_old_forward_extrem_cache, 60 * 60 * 1000 @@ -737,7 +749,7 @@ def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id): room_id, ) - async def get_insertion_event_backwards_extremities_in_room( + async def get_insertion_event_backward_extremities_in_room( self, room_id ) -> Dict[str, int]: """Get the insertion events we know about that we haven't backfilled yet. @@ -754,7 +766,7 @@ async def get_insertion_event_backwards_extremities_in_room( Map from event_id to depth """ - def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id): + def get_insertion_event_backward_extremities_in_room_txn(txn, room_id): sql = """ SELECT b.event_id, MAX(e.depth) FROM insertion_events as i /* We only want insertion events that are also marked as backwards extremities */ @@ -770,8 +782,8 @@ def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id): return dict(txn) return await self.db_pool.runInteraction( - "get_insertion_event_backwards_extremities_in_room", - get_insertion_event_backwards_extremities_in_room_txn, + "get_insertion_event_backward_extremities_in_room", + get_insertion_event_backward_extremities_in_room_txn, room_id, ) @@ -997,143 +1009,242 @@ def get_forward_extremeties_for_room_txn(txn): "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn ) - async def get_backfill_events(self, room_id: str, event_list: list, limit: int): - """Get a list of Events for a given topic that occurred before (and - including) the events in event_list. Return a list of max size `limit` + def _get_connected_batch_event_backfill_results_txn( + self, txn: LoggingTransaction, insertion_event_id: str, limit: int + ) -> List[BackfillQueueNavigationItem]: + """ + Find any batch connections of a given insertion event. + A batch event points at a insertion event via: + batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID] Args: - room_id - event_list - limit + txn: The database transaction to use + insertion_event_id: The event ID to navigate from. We will find + batch events that point back at this insertion event. + limit: Max number of event ID's to query for and return + + Returns: + List of batch events that the backfill queue can process + """ + batch_connection_query = """ + SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i + /* Find the batch that connects to the given insertion event */ + INNER JOIN batch_events AS c + ON i.next_batch_id = c.batch_id + /* Get the depth of the batch start event from the events table */ + INNER JOIN events AS e USING (event_id) + /* Find an insertion event which matches the given event_id */ + WHERE i.event_id = ? + LIMIT ? """ - event_ids = await self.db_pool.runInteraction( - "get_backfill_events", - self._get_backfill_events, - room_id, - event_list, - limit, - ) - events = await self.get_events_as_list(event_ids) - return sorted(events, key=lambda e: -e.depth) - def _get_backfill_events(self, txn, room_id, event_list, limit): - logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit) + # Find any batch connections for the given insertion event + txn.execute( + batch_connection_query, + (insertion_event_id, limit), + ) + return [ + BackfillQueueNavigationItem( + depth=row[0], + stream_ordering=row[1], + event_id=row[2], + type=row[3], + ) + for row in txn + ] - event_results = set() + def _get_connected_prev_event_backfill_results_txn( + self, txn: LoggingTransaction, event_id: str, limit: int + ) -> List[BackfillQueueNavigationItem]: + """ + Find any events connected by prev_event the specified event_id. - # We want to make sure that we do a breadth-first, "depth" ordered - # search. + Args: + txn: The database transaction to use + event_id: The event ID to navigate from + limit: Max number of event ID's to query for and return + Returns: + List of prev events that the backfill queue can process + """ # Look for the prev_event_id connected to the given event_id - query = """ - SELECT depth, prev_event_id FROM event_edges - /* Get the depth of the prev_event_id from the events table */ + connected_prev_event_query = """ + SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges + /* Get the depth and stream_ordering of the prev_event_id from the events table */ INNER JOIN events ON prev_event_id = events.event_id - /* Find an event which matches the given event_id */ + /* Look for an edge which matches the given event_id */ WHERE event_edges.event_id = ? AND event_edges.is_state = ? + /* Because we can have many events at the same depth, + * we want to also tie-break and sort on stream_ordering */ + ORDER BY depth DESC, stream_ordering DESC LIMIT ? """ - # Look for the "insertion" events connected to the given event_id - connected_insertion_event_query = """ - SELECT e.depth, i.event_id FROM insertion_event_edges AS i - /* Get the depth of the insertion event from the events table */ - INNER JOIN events AS e USING (event_id) - /* Find an insertion event which points via prev_events to the given event_id */ - WHERE i.insertion_prev_event_id = ? - LIMIT ? + txn.execute( + connected_prev_event_query, + (event_id, False, limit), + ) + return [ + BackfillQueueNavigationItem( + depth=row[0], + stream_ordering=row[1], + event_id=row[2], + type=row[3], + ) + for row in txn + ] + + async def get_backfill_events( + self, room_id: str, seed_event_id_list: list, limit: int + ): + """Get a list of Events for a given topic that occurred before (and + including) the events in seed_event_id_list. Return a list of max size `limit` + + Args: + room_id + seed_event_id_list + limit """ + event_ids = await self.db_pool.runInteraction( + "get_backfill_events", + self._get_backfill_events, + room_id, + seed_event_id_list, + limit, + ) + events = await self.get_events_as_list(event_ids) + return sorted( + events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering) + ) - # Find any batch connections of a given insertion event - batch_connection_query = """ - SELECT e.depth, c.event_id FROM insertion_events AS i - /* Find the batch that connects to the given insertion event */ - INNER JOIN batch_events AS c - ON i.next_batch_id = c.batch_id - /* Get the depth of the batch start event from the events table */ - INNER JOIN events AS e USING (event_id) - /* Find an insertion event which matches the given event_id */ - WHERE i.event_id = ? - LIMIT ? + def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit): + """ + We want to make sure that we do a breadth-first, "depth" ordered search. + We also handle navigating historical branches of history connected by + insertion and batch events. """ + logger.debug( + "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s", + room_id, + seed_event_id_list, + limit, + ) + + event_id_results = set() # In a PriorityQueue, the lowest valued entries are retrieved first. - # We're using depth as the priority in the queue. - # Depth is lowest at the oldest-in-time message and highest and - # newest-in-time message. We add events to the queue with a negative depth so that - # we process the newest-in-time messages first going backwards in time. + # We're using depth as the priority in the queue and tie-break based on + # stream_ordering. Depth is lowest at the oldest-in-time message and + # highest and newest-in-time message. We add events to the queue with a + # negative depth so that we process the newest-in-time messages first + # going backwards in time. stream_ordering follows the same pattern. queue = PriorityQueue() - for event_id in event_list: - depth = self.db_pool.simple_select_one_onecol_txn( + for seed_event_id in seed_event_id_list: + event_lookup_result = self.db_pool.simple_select_one_txn( txn, table="events", - keyvalues={"event_id": event_id, "room_id": room_id}, - retcol="depth", + keyvalues={"event_id": seed_event_id, "room_id": room_id}, + retcols=( + "type", + "depth", + "stream_ordering", + ), allow_none=True, ) - if depth: - queue.put((-depth, event_id)) + if event_lookup_result is not None: + logger.debug( + "_get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s", + room_id, + seed_event_id, + event_lookup_result["depth"], + event_lookup_result["stream_ordering"], + event_lookup_result["type"], + ) - while not queue.empty() and len(event_results) < limit: + if event_lookup_result["depth"]: + queue.put( + ( + -event_lookup_result["depth"], + -event_lookup_result["stream_ordering"], + seed_event_id, + event_lookup_result["type"], + ) + ) + + while not queue.empty() and len(event_id_results) < limit: try: - _, event_id = queue.get_nowait() + _, _, event_id, event_type = queue.get_nowait() except Empty: break - if event_id in event_results: + if event_id in event_id_results: continue - event_results.add(event_id) + event_id_results.add(event_id) # Try and find any potential historical batches of message history. - # - # First we look for an insertion event connected to the current - # event (by prev_event). If we find any, we need to go and try to - # find any batch events connected to the insertion event (by - # batch_id). If we find any, we'll add them to the queue and - # navigate up the DAG like normal in the next iteration of the loop. - txn.execute( - connected_insertion_event_query, (event_id, limit - len(event_results)) - ) - connected_insertion_event_id_results = txn.fetchall() - logger.debug( - "_get_backfill_events: connected_insertion_event_query %s", - connected_insertion_event_id_results, - ) - for row in connected_insertion_event_id_results: - connected_insertion_event_depth = row[0] - connected_insertion_event = row[1] - queue.put((-connected_insertion_event_depth, connected_insertion_event)) + if self.hs.config.experimental.msc2716_enabled: + # We need to go and try to find any batch events connected + # to a given insertion event (by batch_id). If we find any, we'll + # add them to the queue and navigate up the DAG like normal in the + # next iteration of the loop. + if event_type == EventTypes.MSC2716_INSERTION: + # Find any batch connections for the given insertion event + connected_batch_event_backfill_results = ( + self._get_connected_batch_event_backfill_results_txn( + txn, event_id, limit - len(event_id_results) + ) + ) + logger.debug( + "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s", + room_id, + connected_batch_event_backfill_results, + ) + for ( + connected_batch_event_backfill_item + ) in connected_batch_event_backfill_results: + if ( + connected_batch_event_backfill_item.event_id + not in event_id_results + ): + queue.put( + ( + -connected_batch_event_backfill_item.depth, + -connected_batch_event_backfill_item.stream_ordering, + connected_batch_event_backfill_item.event_id, + connected_batch_event_backfill_item.type, + ) + ) - # Find any batch connections for the given insertion event - txn.execute( - batch_connection_query, - (connected_insertion_event, limit - len(event_results)), - ) - batch_start_event_id_results = txn.fetchall() - logger.debug( - "_get_backfill_events: batch_start_event_id_results %s", - batch_start_event_id_results, + # Now we just look up the DAG by prev_events as normal + connected_prev_event_backfill_results = ( + self._get_connected_prev_event_backfill_results_txn( + txn, event_id, limit - len(event_id_results) ) - for row in batch_start_event_id_results: - if row[1] not in event_results: - queue.put((-row[0], row[1])) - - txn.execute(query, (event_id, False, limit - len(event_results))) - prev_event_id_results = txn.fetchall() + ) logger.debug( - "_get_backfill_events: prev_event_ids %s", prev_event_id_results + "_get_backfill_events(room_id=%s): connected_prev_event_backfill_results=%s", + room_id, + connected_prev_event_backfill_results, ) + for ( + connected_prev_event_backfill_item + ) in connected_prev_event_backfill_results: + if connected_prev_event_backfill_item.event_id not in event_id_results: + queue.put( + ( + -connected_prev_event_backfill_item.depth, + -connected_prev_event_backfill_item.stream_ordering, + connected_prev_event_backfill_item.event_id, + connected_prev_event_backfill_item.type, + ) + ) - for row in prev_event_id_results: - if row[1] not in event_results: - queue.put((-row[0], row[1])) - - return event_results + return event_id_results async def get_missing_events(self, room_id, earliest_events, latest_events, limit): ids = await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b7554154ac19..b804185c404e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2215,9 +2215,14 @@ def _update_backward_extremeties(self, txn, events): " SELECT 1 FROM event_backward_extremities" " WHERE event_id = ? AND room_id = ?" " )" + # 1. Don't add an event as a extremity again if we already persisted it + # as a non-outlier. + # 2. Don't add an outlier as an extremity if it has no prev_events " AND NOT EXISTS (" - " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " - " AND outlier = ?" + " SELECT 1 FROM events" + " LEFT JOIN event_edges edge" + " ON edge.event_id = events.event_id" + " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)" " )" ) @@ -2243,6 +2248,10 @@ def _update_backward_extremeties(self, txn, events): (ev.event_id, ev.room_id) for ev in events if not ev.internal_metadata.is_outlier() + # If we encountered an event with no prev_events, then we might + # as well remove it now because it won't ever have anything else + # to backfill from. + or len(ev.prev_event_ids()) == 0 ], )