From 996057c3f42272c0e32bac19b698aeefcb8136c3 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 1 Oct 2021 18:41:50 -0500 Subject: [PATCH 1/7] Refactor /batch_send into some separate functions --- synapse/rest/client/room_batch.py | 361 +++++++++++++++++++----------- 1 file changed, 229 insertions(+), 132 deletions(-) diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index 1dffcc314793..0e0859edf5fd 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -173,41 +173,18 @@ async def _create_requester_for_user_id_from_app_service( return create_requester(user_id, app_service=app_service) - async def on_POST( - self, request: SynapseRequest, room_id: str - ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=False) - - if not requester.app_service: - raise AuthError( - HTTPStatus.FORBIDDEN, - "Only application services can use the /batchsend endpoint", - ) - - body = parse_json_object_from_request(request) - assert_params_in_dict(body, ["state_events_at_start", "events"]) - - assert request.args is not None - prev_event_ids_from_query = parse_strings_from_args( - request.args, "prev_event_id" - ) - batch_id_from_query = parse_string(request, "batch_id") - - if prev_event_ids_from_query is None: - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "prev_event query parameter is required when inserting historical messages back in time", - errcode=Codes.MISSING_PARAM, - ) + async def _getMostRecentAuthEventIdsFromEventIdList( + self, event_ids: List[str] + ) -> List[str]: + """Find the most recent auth event ids (derived from state events) that + allowed that message to be sent. We will use that as a base + to auth our historical messages against. + """ - # For the event we are inserting next to (`prev_event_ids_from_query`), - # find the most recent auth events (derived from state events) that - # allowed that message to be sent. We will use that as a base - # to auth our historical messages against. ( most_recent_prev_event_id, _, - ) = await self.store.get_max_depth_of(prev_event_ids_from_query) + ) = await self.store.get_max_depth_of(event_ids) # mapping from (type, state_key) -> state_event_id prev_state_map = await self.state_store.get_state_ids_for_event( most_recent_prev_event_id @@ -216,8 +193,18 @@ async def on_POST( prev_state_ids = list(prev_state_map.values()) auth_event_ids = prev_state_ids + return auth_event_ids + + async def _persistStateEventsAtStart( + self, + state_events_at_start: List[str], + room_id: str, + initial_auth_event_ids: List[str], + requester: Requester, + ) -> List[str]: state_event_ids_at_start = [] - for state_event in body["state_events_at_start"]: + auth_event_ids = initial_auth_event_ids.copy() + for state_event in state_events_at_start: assert_params_in_dict( state_event, ["type", "origin_server_ts", "content", "sender"] ) @@ -240,7 +227,8 @@ async def on_POST( # Mark all events as historical event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - # Make the state events float off on their own + # Make the state events float off on their own so we don't have a + # bunch of `@mxid joined the room` noise between each batch fake_prev_event_id = "$" + random_string(43) # TODO: This is pretty much the same as some other code to handle inserting state in this file @@ -285,103 +273,18 @@ async def on_POST( state_event_ids_at_start.append(event_id) auth_event_ids.append(event_id) - events_to_create = body["events"] - - inherited_depth = await self._inherit_depth_from_prev_ids( - prev_event_ids_from_query - ) - - # Figure out which batch to connect to. If they passed in - # batch_id_from_query let's use it. The batch ID passed in comes - # from the batch_id in the "insertion" event from the previous batch. - last_event_in_batch = events_to_create[-1] - batch_id_to_connect_to = batch_id_from_query - base_insertion_event = None - if batch_id_from_query: - # All but the first base insertion event should point at a fake - # event, which causes the HS to ask for the state at the start of - # the batch later. - prev_event_ids = [fake_prev_event_id] - - # Verify the batch_id_from_query corresponds to an actual insertion event - # and have the batch connected. - corresponding_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id( - room_id, batch_id_from_query - ) - ) - if corresponding_insertion_event_id is None: - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "No insertion event corresponds to the given ?batch_id", - errcode=Codes.INVALID_PARAM, - ) - pass - # Otherwise, create an insertion event to act as a starting point. - # - # We don't always have an insertion event to start hanging more history - # off of (ideally there would be one in the main DAG, but that's not the - # case if we're wanting to add history to e.g. existing rooms without - # an insertion event), in which case we just create a new insertion event - # that can then get pointed to by a "marker" event later. - else: - prev_event_ids = prev_event_ids_from_query - - base_insertion_event_dict = self._create_insertion_event_dict( - sender=requester.user.to_string(), - room_id=room_id, - origin_server_ts=last_event_in_batch["origin_server_ts"], - ) - base_insertion_event_dict["prev_events"] = prev_event_ids.copy() + return state_event_ids_at_start - ( - base_insertion_event, - _, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self._create_requester_for_user_id_from_app_service( - base_insertion_event_dict["sender"], - requester.app_service, - ), - base_insertion_event_dict, - prev_event_ids=base_insertion_event_dict.get("prev_events"), - auth_event_ids=auth_event_ids, - historical=True, - depth=inherited_depth, - ) - - batch_id_to_connect_to = base_insertion_event["content"][ - EventContentFields.MSC2716_NEXT_BATCH_ID - ] - - # Connect this current batch to the insertion event from the previous batch - batch_event = { - "type": EventTypes.MSC2716_BATCH, - "sender": requester.user.to_string(), - "room_id": room_id, - "content": { - EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to, - EventContentFields.MSC2716_HISTORICAL: True, - }, - # Since the batch event is put at the end of the batch, - # where the newest-in-time event is, copy the origin_server_ts from - # the last event we're inserting - "origin_server_ts": last_event_in_batch["origin_server_ts"], - } - # Add the batch event to the end of the batch (newest-in-time) - events_to_create.append(batch_event) - - # Add an "insertion" event to the start of each batch (next to the oldest-in-time - # event in the batch) so the next batch can be connected to this one. - insertion_event = self._create_insertion_event_dict( - sender=requester.user.to_string(), - room_id=room_id, - # Since the insertion event is put at the start of the batch, - # where the oldest-in-time event is, copy the origin_server_ts from - # the first event we're inserting - origin_server_ts=events_to_create[0]["origin_server_ts"], - ) - # Prepend the insertion event to the start of the batch (oldest-in-time) - events_to_create = [insertion_event] + events_to_create + async def _persistHistoricalEvents( + self, + events_to_create: List[str], + room_id: str, + initial_prev_event_ids: List[str], + inherited_depth: int, + auth_event_ids: List[str], + requester: Requester, + ) -> List[str]: + prev_event_ids = initial_prev_event_ids.copy() event_ids = [] events_to_persist = [] @@ -440,6 +343,202 @@ async def on_POST( context=context, ) + return event_ids + + async def _handleBatchOfEvents( + self, + events_to_create: List[str], + room_id: str, + batch_id_to_connect_to: str, + initial_prev_event_ids: List[str], + inherited_depth: int, + auth_event_ids: List[str], + requester: Requester, + ) -> Tuple[List[str], str]: + """ + Handles creating and persisting all of the historical events as well + as insertion and batch meta events to make the batch navigable in the DAG. + + Returns: + Tuple containing a list of created events and the next_batch_id + """ + + # Connect this current batch to the insertion event from the previous batch + last_event_in_batch = events_to_create[-1] + batch_event = { + "type": EventTypes.MSC2716_BATCH, + "sender": requester.user.to_string(), + "room_id": room_id, + "content": { + EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to, + EventContentFields.MSC2716_HISTORICAL: True, + }, + # Since the batch event is put at the end of the batch, + # where the newest-in-time event is, copy the origin_server_ts from + # the last event we're inserting + "origin_server_ts": last_event_in_batch["origin_server_ts"], + } + # Add the batch event to the end of the batch (newest-in-time) + events_to_create.append(batch_event) + + # Add an "insertion" event to the start of each batch (next to the oldest-in-time + # event in the batch) so the next batch can be connected to this one. + insertion_event = self._create_insertion_event_dict( + sender=requester.user.to_string(), + room_id=room_id, + # Since the insertion event is put at the start of the batch, + # where the oldest-in-time event is, copy the origin_server_ts from + # the first event we're inserting + origin_server_ts=events_to_create[0]["origin_server_ts"], + ) + next_batch_id = insertion_event["content"][ + EventContentFields.MSC2716_NEXT_BATCH_ID + ] + # Prepend the insertion event to the start of the batch (oldest-in-time) + events_to_create = [insertion_event] + events_to_create + + # Create and persist all of the historical events + event_ids = await self._persistHistoricalEvents( + events_to_create=events_to_create, + room_id=room_id, + initial_prev_event_ids=initial_prev_event_ids, + inherited_depth=inherited_depth, + auth_event_ids=auth_event_ids, + requester=requester, + ) + + return event_ids, next_batch_id + + async def on_POST( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=False) + + if not requester.app_service: + raise AuthError( + HTTPStatus.FORBIDDEN, + "Only application services can use the /batchsend endpoint", + ) + + body = parse_json_object_from_request(request) + assert_params_in_dict(body, ["state_events_at_start", "events"]) + + assert request.args is not None + prev_event_ids_from_query = parse_strings_from_args( + request.args, "prev_event_id" + ) + batch_id_from_query = parse_string(request, "batch_id") + + if prev_event_ids_from_query is None: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "prev_event query parameter is required when inserting historical messages back in time", + errcode=Codes.MISSING_PARAM, + ) + + # Verify the batch_id_from_query corresponds to an actual insertion event + # and have the batch connected. + if batch_id_from_query: + corresponding_insertion_event_id = ( + await self.store.get_insertion_event_by_batch_id( + room_id, batch_id_from_query + ) + ) + if corresponding_insertion_event_id is None: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "No insertion event corresponds to the given ?batch_id", + errcode=Codes.INVALID_PARAM, + ) + pass + + # For the event we are inserting next to (`prev_event_ids_from_query`), + # find the most recent auth events (derived from state events) that + # allowed that message to be sent. We will use that as a base + # to auth our historical messages against. + auth_event_ids = await self._getMostRecentAuthEventIdsFromEventIdList( + prev_event_ids_from_query + ) + + # Create and persist all of the state events that float off on their own + # before the batch. These will most likely be all of the invite/member + # state events used to auth the upcoming historical messages. + state_event_ids_at_start = await self._persistStateEventsAtStart( + state_events_at_start=body["state_events_at_start"], + room_id=room_id, + initial_auth_event_ids=auth_event_ids, + requester=requester, + ) + # Update our ongoing auth event ID list with all of the new state we + # just created + auth_event_ids.extend(state_event_ids_at_start) + + inherited_depth = await self._inherit_depth_from_prev_ids( + prev_event_ids_from_query + ) + + events_to_create = body["events"] + + # Figure out which batch to connect to. If they passed in + # batch_id_from_query let's use it. The batch ID passed in comes + # from the batch_id in the "insertion" event from the previous batch. + last_event_in_batch = events_to_create[-1] + batch_id_to_connect_to = batch_id_from_query + base_insertion_event = None + if batch_id_from_query: + # All but the first base insertion event should point at a fake + # event, which causes the HS to ask for the state at the start of + # the batch later. + fake_prev_event_id = "$" + random_string(43) + prev_event_ids = [fake_prev_event_id] + # Otherwise, create an insertion event to act as a starting point. + # + # We don't always have an insertion event to start hanging more history + # off of (ideally there would be one in the main DAG, but that's not the + # case if we're wanting to add history to e.g. existing rooms without + # an insertion event), in which case we just create a new insertion event + # that can then get pointed to by a "marker" event later. + else: + prev_event_ids = prev_event_ids_from_query + + base_insertion_event_dict = self._create_insertion_event_dict( + sender=requester.user.to_string(), + room_id=room_id, + origin_server_ts=last_event_in_batch["origin_server_ts"], + ) + base_insertion_event_dict["prev_events"] = prev_event_ids.copy() + + ( + base_insertion_event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( + await self._create_requester_for_user_id_from_app_service( + base_insertion_event_dict["sender"], + requester.app_service, + ), + base_insertion_event_dict, + prev_event_ids=base_insertion_event_dict.get("prev_events"), + auth_event_ids=auth_event_ids, + historical=True, + depth=inherited_depth, + ) + + batch_id_to_connect_to = base_insertion_event["content"][ + EventContentFields.MSC2716_NEXT_BATCH_ID + ] + + # Create and persist all of the historical events as well as insertion + # and batch meta events to make the batch navigable in the DAG. + event_ids, next_batch_id = await self._handleBatchOfEvents( + events_to_create=events_to_create, + room_id=room_id, + batch_id_to_connect_to=batch_id_to_connect_to, + initial_prev_event_ids=prev_event_ids, + inherited_depth=inherited_depth, + auth_event_ids=auth_event_ids, + requester=requester, + ) + insertion_event_id = event_ids[0] batch_event_id = event_ids[-1] historical_event_ids = event_ids[1:-1] @@ -447,9 +546,7 @@ async def on_POST( response_dict = { "state_event_ids": state_event_ids_at_start, "event_ids": historical_event_ids, - "next_batch_id": insertion_event["content"][ - EventContentFields.MSC2716_NEXT_BATCH_ID - ], + "next_batch_id": next_batch_id, "insertion_event_id": insertion_event_id, "batch_event_id": batch_event_id, } From fdb8932bf019f2c127b2aedf846c3740a4d72686 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 1 Oct 2021 18:59:25 -0500 Subject: [PATCH 2/7] Add some comments docs --- synapse/rest/client/room_batch.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index 0e0859edf5fd..5ca10aff6e75 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -202,6 +202,15 @@ async def _persistStateEventsAtStart( initial_auth_event_ids: List[str], requester: Requester, ) -> List[str]: + """Takes all `state_events_at_start` event dictionaries and creates/persists + them as floating state events which don't resolve into the current room state. + They are floating because they reference a fake prev_event which doesn't connect + to the normal DAG at all. + + Returns: + List of state event ID's we just persisted + """ + state_event_ids_at_start = [] auth_event_ids = initial_auth_event_ids.copy() for state_event in state_events_at_start: @@ -284,6 +293,17 @@ async def _persistHistoricalEvents( auth_event_ids: List[str], requester: Requester, ) -> List[str]: + """Create and persists all events provided sequentially. Handles the + complexity of creating events in chronological order so they can + reference each other by prev_event but still persists in + reverse-chronoloical order so they have the correct + (topological_ordering, stream_ordering) and sort correctly from + /messages. + + Returns: + List of persisted event IDs + """ + prev_event_ids = initial_prev_event_ids.copy() event_ids = [] From 5ca9f141509ffd76340f11f38862726731edda82 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 1 Oct 2021 20:55:35 -0500 Subject: [PATCH 3/7] Move logic to handler --- synapse/handlers/room_batch.py | 373 ++++++++++++++++++++++++++++ synapse/rest/client/room_batch.py | 392 ++---------------------------- synapse/server.py | 5 + 3 files changed, 403 insertions(+), 367 deletions(-) create mode 100644 synapse/handlers/room_batch.py diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py new file mode 100644 index 000000000000..8fb665004e85 --- /dev/null +++ b/synapse/handlers/room_batch.py @@ -0,0 +1,373 @@ +import logging +from typing import TYPE_CHECKING, List, Tuple + +from synapse.http.servlet import ( + assert_params_in_dict, +) +from synapse.api.constants import EventContentFields, EventTypes +from synapse.appservice import ApplicationService +from synapse.types import JsonDict, Requester, UserID, create_requester +from synapse.util.stringutils import random_string + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class RoomBatchHandler: + """Contains some read only APIs to get state about a room""" + + def __init__(self, hs: "HomeServer"): + self.hs = hs + self.store = hs.get_datastore() + self.state_store = hs.get_storage().state + self.event_creation_handler = hs.get_event_creation_handler() + self.room_member_handler = hs.get_room_member_handler() + self.auth = hs.get_auth() + + async def inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int: + ( + most_recent_prev_event_id, + most_recent_prev_event_depth, + ) = await self.store.get_max_depth_of(prev_event_ids) + + # We want to insert the historical event after the `prev_event` but before the successor event + # + # We inherit depth from the successor event instead of the `prev_event` + # because events returned from `/messages` are first sorted by `topological_ordering` + # which is just the `depth` and then tie-break with `stream_ordering`. + # + # We mark these inserted historical events as "backfilled" which gives them a + # negative `stream_ordering`. If we use the same depth as the `prev_event`, + # then our historical event will tie-break and be sorted before the `prev_event` + # when it should come after. + # + # We want to use the successor event depth so they appear after `prev_event` because + # it has a larger `depth` but before the successor event because the `stream_ordering` + # is negative before the successor event. + successor_event_ids = await self.store.get_successor_events( + [most_recent_prev_event_id] + ) + + # If we can't find any successor events, then it's a forward extremity of + # historical messages and we can just inherit from the previous historical + # event which we can already assume has the correct depth where we want + # to insert into. + if not successor_event_ids: + depth = most_recent_prev_event_depth + else: + ( + _, + oldest_successor_depth, + ) = await self.store.get_min_depth_of(successor_event_ids) + + depth = oldest_successor_depth + + return depth + + def create_insertion_event_dict( + self, sender: str, room_id: str, origin_server_ts: int + ) -> JsonDict: + """Creates an event dict for an "insertion" event with the proper fields + and a random batch ID. + + Args: + sender: The event author MXID + room_id: The room ID that the event belongs to + origin_server_ts: Timestamp when the event was sent + + Returns: + The new event dictionary to insert. + """ + + next_batch_id = random_string(8) + insertion_event = { + "type": EventTypes.MSC2716_INSERTION, + "sender": sender, + "room_id": room_id, + "content": { + EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id, + EventContentFields.MSC2716_HISTORICAL: True, + }, + "origin_server_ts": origin_server_ts, + } + + return insertion_event + + async def create_requester_for_user_id_from_app_service( + self, user_id: str, app_service: ApplicationService + ) -> Requester: + """Creates a new requester for the given user_id + and validates that the app service is allowed to control + the given user. + + Args: + user_id: The author MXID that the app service is controlling + app_service: The app service that controls the user + + Returns: + Requester object + """ + + await self.auth.validate_appservice_can_control_user_id(app_service, user_id) + + return create_requester(user_id, app_service=app_service) + + async def getMostRecentAuthEventIdsFromEventIdList( + self, event_ids: List[str] + ) -> List[str]: + """Find the most recent auth event ids (derived from state events) that + allowed that message to be sent. We will use that as a base + to auth our historical messages against. + """ + + ( + most_recent_prev_event_id, + _, + ) = await self.store.get_max_depth_of(event_ids) + # mapping from (type, state_key) -> state_event_id + prev_state_map = await self.state_store.get_state_ids_for_event( + most_recent_prev_event_id + ) + # List of state event ID's + prev_state_ids = list(prev_state_map.values()) + auth_event_ids = prev_state_ids + + return auth_event_ids + + async def persistStateEventsAtStart( + self, + state_events_at_start: List[JsonDict], + room_id: str, + initial_auth_event_ids: List[str], + requester: Requester, + ) -> List[str]: + """Takes all `state_events_at_start` event dictionaries and creates/persists + them as floating state events which don't resolve into the current room state. + They are floating because they reference a fake prev_event which doesn't connect + to the normal DAG at all. + + Returns: + List of state event ID's we just persisted + """ + assert requester.app_service + + state_event_ids_at_start = [] + auth_event_ids = initial_auth_event_ids.copy() + for state_event in state_events_at_start: + assert_params_in_dict( + state_event, ["type", "origin_server_ts", "content", "sender"] + ) + + logger.debug( + "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s", + state_event, + auth_event_ids, + ) + + event_dict = { + "type": state_event["type"], + "origin_server_ts": state_event["origin_server_ts"], + "content": state_event["content"], + "room_id": room_id, + "sender": state_event["sender"], + "state_key": state_event["state_key"], + } + + # Mark all events as historical + event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True + + # Make the state events float off on their own so we don't have a + # bunch of `@mxid joined the room` noise between each batch + fake_prev_event_id = "$" + random_string(43) + + # TODO: This is pretty much the same as some other code to handle inserting state in this file + if event_dict["type"] == EventTypes.Member: + membership = event_dict["content"].get("membership", None) + event_id, _ = await self.room_member_handler.update_membership( + await self.create_requester_for_user_id_from_app_service( + state_event["sender"], requester.app_service + ), + target=UserID.from_string(event_dict["state_key"]), + room_id=room_id, + action=membership, + content=event_dict["content"], + outlier=True, + prev_event_ids=[fake_prev_event_id], + # Make sure to use a copy of this list because we modify it + # later in the loop here. Otherwise it will be the same + # reference and also update in the event when we append later. + auth_event_ids=auth_event_ids.copy(), + ) + else: + # TODO: Add some complement tests that adds state that is not member joins + # and will use this code path. Maybe we only want to support join state events + # and can get rid of this `else`? + ( + event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( + await self.create_requester_for_user_id_from_app_service( + state_event["sender"], requester.app_service + ), + event_dict, + outlier=True, + prev_event_ids=[fake_prev_event_id], + # Make sure to use a copy of this list because we modify it + # later in the loop here. Otherwise it will be the same + # reference and also update in the event when we append later. + auth_event_ids=auth_event_ids.copy(), + ) + event_id = event.event_id + + state_event_ids_at_start.append(event_id) + auth_event_ids.append(event_id) + + return state_event_ids_at_start + + async def persistHistoricalEvents( + self, + events_to_create: List[JsonDict], + room_id: str, + initial_prev_event_ids: List[str], + inherited_depth: int, + auth_event_ids: List[str], + requester: Requester, + ) -> List[str]: + """Create and persists all events provided sequentially. Handles the + complexity of creating events in chronological order so they can + reference each other by prev_event but still persists in + reverse-chronoloical order so they have the correct + (topological_ordering, stream_ordering) and sort correctly from + /messages. + + Returns: + List of persisted event IDs + """ + assert requester.app_service + + prev_event_ids = initial_prev_event_ids.copy() + + event_ids = [] + events_to_persist = [] + for ev in events_to_create: + assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) + + event_dict = { + "type": ev["type"], + "origin_server_ts": ev["origin_server_ts"], + "content": ev["content"], + "room_id": room_id, + "sender": ev["sender"], # requester.user.to_string(), + "prev_events": prev_event_ids.copy(), + } + + # Mark all events as historical + event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True + + event, context = await self.event_creation_handler.create_event( + await self.create_requester_for_user_id_from_app_service( + ev["sender"], requester.app_service + ), + event_dict, + prev_event_ids=event_dict.get("prev_events"), + auth_event_ids=auth_event_ids, + historical=True, + depth=inherited_depth, + ) + logger.debug( + "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s", + event, + prev_event_ids, + auth_event_ids, + ) + + assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( + event.sender, + ) + + events_to_persist.append((event, context)) + event_id = event.event_id + + event_ids.append(event_id) + prev_event_ids = [event_id] + + # Persist events in reverse-chronological order so they have the + # correct stream_ordering as they are backfilled (which decrements). + # Events are sorted by (topological_ordering, stream_ordering) + # where topological_ordering is just depth. + for (event, context) in reversed(events_to_persist): + await self.event_creation_handler.handle_new_client_event( + await self.create_requester_for_user_id_from_app_service( + event["sender"], requester.app_service + ), + event=event, + context=context, + ) + + return event_ids + + async def handleBatchOfEvents( + self, + events_to_create: List[JsonDict], + room_id: str, + batch_id_to_connect_to: str, + initial_prev_event_ids: List[str], + inherited_depth: int, + auth_event_ids: List[str], + requester: Requester, + ) -> Tuple[List[str], str]: + """ + Handles creating and persisting all of the historical events as well + as insertion and batch meta events to make the batch navigable in the DAG. + + Returns: + Tuple containing a list of created events and the next_batch_id + """ + + # Connect this current batch to the insertion event from the previous batch + last_event_in_batch = events_to_create[-1] + batch_event = { + "type": EventTypes.MSC2716_BATCH, + "sender": requester.user.to_string(), + "room_id": room_id, + "content": { + EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to, + EventContentFields.MSC2716_HISTORICAL: True, + }, + # Since the batch event is put at the end of the batch, + # where the newest-in-time event is, copy the origin_server_ts from + # the last event we're inserting + "origin_server_ts": last_event_in_batch["origin_server_ts"], + } + # Add the batch event to the end of the batch (newest-in-time) + events_to_create.append(batch_event) + + # Add an "insertion" event to the start of each batch (next to the oldest-in-time + # event in the batch) so the next batch can be connected to this one. + insertion_event = self.create_insertion_event_dict( + sender=requester.user.to_string(), + room_id=room_id, + # Since the insertion event is put at the start of the batch, + # where the oldest-in-time event is, copy the origin_server_ts from + # the first event we're inserting + origin_server_ts=events_to_create[0]["origin_server_ts"], + ) + next_batch_id = insertion_event["content"][ + EventContentFields.MSC2716_NEXT_BATCH_ID + ] + # Prepend the insertion event to the start of the batch (oldest-in-time) + events_to_create = [insertion_event] + events_to_create + + # Create and persist all of the historical events + event_ids = await self.persistHistoricalEvents( + events_to_create=events_to_create, + room_id=room_id, + initial_prev_event_ids=initial_prev_event_ids, + inherited_depth=inherited_depth, + auth_event_ids=auth_event_ids, + requester=requester, + ) + + return event_ids, next_batch_id diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index 5ca10aff6e75..d3cae3f2ff33 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -15,13 +15,12 @@ import logging import re from http import HTTPStatus -from typing import TYPE_CHECKING, Awaitable, List, Tuple +from typing import TYPE_CHECKING, Awaitable, Tuple from twisted.web.server import Request -from synapse.api.constants import EventContentFields, EventTypes +from synapse.api.constants import EventContentFields from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.appservice import ApplicationService from synapse.http.server import HttpServer from synapse.http.servlet import ( RestServlet, @@ -32,7 +31,7 @@ ) from synapse.http.site import SynapseRequest from synapse.rest.client.transactions import HttpTransactionCache -from synapse.types import JsonDict, Requester, UserID, create_requester +from synapse.types import JsonDict from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -77,358 +76,12 @@ class RoomBatchSendEventRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() - self.hs = hs self.store = hs.get_datastore() - self.state_store = hs.get_storage().state self.event_creation_handler = hs.get_event_creation_handler() - self.room_member_handler = hs.get_room_member_handler() self.auth = hs.get_auth() + self.room_batch_handler = hs.get_room_batch_handler() self.txns = HttpTransactionCache(hs) - async def _inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int: - ( - most_recent_prev_event_id, - most_recent_prev_event_depth, - ) = await self.store.get_max_depth_of(prev_event_ids) - - # We want to insert the historical event after the `prev_event` but before the successor event - # - # We inherit depth from the successor event instead of the `prev_event` - # because events returned from `/messages` are first sorted by `topological_ordering` - # which is just the `depth` and then tie-break with `stream_ordering`. - # - # We mark these inserted historical events as "backfilled" which gives them a - # negative `stream_ordering`. If we use the same depth as the `prev_event`, - # then our historical event will tie-break and be sorted before the `prev_event` - # when it should come after. - # - # We want to use the successor event depth so they appear after `prev_event` because - # it has a larger `depth` but before the successor event because the `stream_ordering` - # is negative before the successor event. - successor_event_ids = await self.store.get_successor_events( - [most_recent_prev_event_id] - ) - - # If we can't find any successor events, then it's a forward extremity of - # historical messages and we can just inherit from the previous historical - # event which we can already assume has the correct depth where we want - # to insert into. - if not successor_event_ids: - depth = most_recent_prev_event_depth - else: - ( - _, - oldest_successor_depth, - ) = await self.store.get_min_depth_of(successor_event_ids) - - depth = oldest_successor_depth - - return depth - - def _create_insertion_event_dict( - self, sender: str, room_id: str, origin_server_ts: int - ) -> JsonDict: - """Creates an event dict for an "insertion" event with the proper fields - and a random batch ID. - - Args: - sender: The event author MXID - room_id: The room ID that the event belongs to - origin_server_ts: Timestamp when the event was sent - - Returns: - The new event dictionary to insert. - """ - - next_batch_id = random_string(8) - insertion_event = { - "type": EventTypes.MSC2716_INSERTION, - "sender": sender, - "room_id": room_id, - "content": { - EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id, - EventContentFields.MSC2716_HISTORICAL: True, - }, - "origin_server_ts": origin_server_ts, - } - - return insertion_event - - async def _create_requester_for_user_id_from_app_service( - self, user_id: str, app_service: ApplicationService - ) -> Requester: - """Creates a new requester for the given user_id - and validates that the app service is allowed to control - the given user. - - Args: - user_id: The author MXID that the app service is controlling - app_service: The app service that controls the user - - Returns: - Requester object - """ - - await self.auth.validate_appservice_can_control_user_id(app_service, user_id) - - return create_requester(user_id, app_service=app_service) - - async def _getMostRecentAuthEventIdsFromEventIdList( - self, event_ids: List[str] - ) -> List[str]: - """Find the most recent auth event ids (derived from state events) that - allowed that message to be sent. We will use that as a base - to auth our historical messages against. - """ - - ( - most_recent_prev_event_id, - _, - ) = await self.store.get_max_depth_of(event_ids) - # mapping from (type, state_key) -> state_event_id - prev_state_map = await self.state_store.get_state_ids_for_event( - most_recent_prev_event_id - ) - # List of state event ID's - prev_state_ids = list(prev_state_map.values()) - auth_event_ids = prev_state_ids - - return auth_event_ids - - async def _persistStateEventsAtStart( - self, - state_events_at_start: List[str], - room_id: str, - initial_auth_event_ids: List[str], - requester: Requester, - ) -> List[str]: - """Takes all `state_events_at_start` event dictionaries and creates/persists - them as floating state events which don't resolve into the current room state. - They are floating because they reference a fake prev_event which doesn't connect - to the normal DAG at all. - - Returns: - List of state event ID's we just persisted - """ - - state_event_ids_at_start = [] - auth_event_ids = initial_auth_event_ids.copy() - for state_event in state_events_at_start: - assert_params_in_dict( - state_event, ["type", "origin_server_ts", "content", "sender"] - ) - - logger.debug( - "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s", - state_event, - auth_event_ids, - ) - - event_dict = { - "type": state_event["type"], - "origin_server_ts": state_event["origin_server_ts"], - "content": state_event["content"], - "room_id": room_id, - "sender": state_event["sender"], - "state_key": state_event["state_key"], - } - - # Mark all events as historical - event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - - # Make the state events float off on their own so we don't have a - # bunch of `@mxid joined the room` noise between each batch - fake_prev_event_id = "$" + random_string(43) - - # TODO: This is pretty much the same as some other code to handle inserting state in this file - if event_dict["type"] == EventTypes.Member: - membership = event_dict["content"].get("membership", None) - event_id, _ = await self.room_member_handler.update_membership( - await self._create_requester_for_user_id_from_app_service( - state_event["sender"], requester.app_service - ), - target=UserID.from_string(event_dict["state_key"]), - room_id=room_id, - action=membership, - content=event_dict["content"], - outlier=True, - prev_event_ids=[fake_prev_event_id], - # Make sure to use a copy of this list because we modify it - # later in the loop here. Otherwise it will be the same - # reference and also update in the event when we append later. - auth_event_ids=auth_event_ids.copy(), - ) - else: - # TODO: Add some complement tests that adds state that is not member joins - # and will use this code path. Maybe we only want to support join state events - # and can get rid of this `else`? - ( - event, - _, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self._create_requester_for_user_id_from_app_service( - state_event["sender"], requester.app_service - ), - event_dict, - outlier=True, - prev_event_ids=[fake_prev_event_id], - # Make sure to use a copy of this list because we modify it - # later in the loop here. Otherwise it will be the same - # reference and also update in the event when we append later. - auth_event_ids=auth_event_ids.copy(), - ) - event_id = event.event_id - - state_event_ids_at_start.append(event_id) - auth_event_ids.append(event_id) - - return state_event_ids_at_start - - async def _persistHistoricalEvents( - self, - events_to_create: List[str], - room_id: str, - initial_prev_event_ids: List[str], - inherited_depth: int, - auth_event_ids: List[str], - requester: Requester, - ) -> List[str]: - """Create and persists all events provided sequentially. Handles the - complexity of creating events in chronological order so they can - reference each other by prev_event but still persists in - reverse-chronoloical order so they have the correct - (topological_ordering, stream_ordering) and sort correctly from - /messages. - - Returns: - List of persisted event IDs - """ - - prev_event_ids = initial_prev_event_ids.copy() - - event_ids = [] - events_to_persist = [] - for ev in events_to_create: - assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) - - event_dict = { - "type": ev["type"], - "origin_server_ts": ev["origin_server_ts"], - "content": ev["content"], - "room_id": room_id, - "sender": ev["sender"], # requester.user.to_string(), - "prev_events": prev_event_ids.copy(), - } - - # Mark all events as historical - event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - - event, context = await self.event_creation_handler.create_event( - await self._create_requester_for_user_id_from_app_service( - ev["sender"], requester.app_service - ), - event_dict, - prev_event_ids=event_dict.get("prev_events"), - auth_event_ids=auth_event_ids, - historical=True, - depth=inherited_depth, - ) - logger.debug( - "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s", - event, - prev_event_ids, - auth_event_ids, - ) - - assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( - event.sender, - ) - - events_to_persist.append((event, context)) - event_id = event.event_id - - event_ids.append(event_id) - prev_event_ids = [event_id] - - # Persist events in reverse-chronological order so they have the - # correct stream_ordering as they are backfilled (which decrements). - # Events are sorted by (topological_ordering, stream_ordering) - # where topological_ordering is just depth. - for (event, context) in reversed(events_to_persist): - ev = await self.event_creation_handler.handle_new_client_event( - await self._create_requester_for_user_id_from_app_service( - event["sender"], requester.app_service - ), - event=event, - context=context, - ) - - return event_ids - - async def _handleBatchOfEvents( - self, - events_to_create: List[str], - room_id: str, - batch_id_to_connect_to: str, - initial_prev_event_ids: List[str], - inherited_depth: int, - auth_event_ids: List[str], - requester: Requester, - ) -> Tuple[List[str], str]: - """ - Handles creating and persisting all of the historical events as well - as insertion and batch meta events to make the batch navigable in the DAG. - - Returns: - Tuple containing a list of created events and the next_batch_id - """ - - # Connect this current batch to the insertion event from the previous batch - last_event_in_batch = events_to_create[-1] - batch_event = { - "type": EventTypes.MSC2716_BATCH, - "sender": requester.user.to_string(), - "room_id": room_id, - "content": { - EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to, - EventContentFields.MSC2716_HISTORICAL: True, - }, - # Since the batch event is put at the end of the batch, - # where the newest-in-time event is, copy the origin_server_ts from - # the last event we're inserting - "origin_server_ts": last_event_in_batch["origin_server_ts"], - } - # Add the batch event to the end of the batch (newest-in-time) - events_to_create.append(batch_event) - - # Add an "insertion" event to the start of each batch (next to the oldest-in-time - # event in the batch) so the next batch can be connected to this one. - insertion_event = self._create_insertion_event_dict( - sender=requester.user.to_string(), - room_id=room_id, - # Since the insertion event is put at the start of the batch, - # where the oldest-in-time event is, copy the origin_server_ts from - # the first event we're inserting - origin_server_ts=events_to_create[0]["origin_server_ts"], - ) - next_batch_id = insertion_event["content"][ - EventContentFields.MSC2716_NEXT_BATCH_ID - ] - # Prepend the insertion event to the start of the batch (oldest-in-time) - events_to_create = [insertion_event] + events_to_create - - # Create and persist all of the historical events - event_ids = await self._persistHistoricalEvents( - events_to_create=events_to_create, - room_id=room_id, - initial_prev_event_ids=initial_prev_event_ids, - inherited_depth=inherited_depth, - auth_event_ids=auth_event_ids, - requester=requester, - ) - - return event_ids, next_batch_id - async def on_POST( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: @@ -470,30 +123,33 @@ async def on_POST( "No insertion event corresponds to the given ?batch_id", errcode=Codes.INVALID_PARAM, ) - pass # For the event we are inserting next to (`prev_event_ids_from_query`), # find the most recent auth events (derived from state events) that # allowed that message to be sent. We will use that as a base # to auth our historical messages against. - auth_event_ids = await self._getMostRecentAuthEventIdsFromEventIdList( - prev_event_ids_from_query + auth_event_ids = ( + await self.room_batch_handler.getMostRecentAuthEventIdsFromEventIdList( + prev_event_ids_from_query + ) ) # Create and persist all of the state events that float off on their own # before the batch. These will most likely be all of the invite/member # state events used to auth the upcoming historical messages. - state_event_ids_at_start = await self._persistStateEventsAtStart( - state_events_at_start=body["state_events_at_start"], - room_id=room_id, - initial_auth_event_ids=auth_event_ids, - requester=requester, + state_event_ids_at_start = ( + await self.room_batch_handler.persistStateEventsAtStart( + state_events_at_start=body["state_events_at_start"], + room_id=room_id, + initial_auth_event_ids=auth_event_ids, + requester=requester, + ) ) # Update our ongoing auth event ID list with all of the new state we # just created auth_event_ids.extend(state_event_ids_at_start) - inherited_depth = await self._inherit_depth_from_prev_ids( + inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids( prev_event_ids_from_query ) @@ -503,9 +159,9 @@ async def on_POST( # batch_id_from_query let's use it. The batch ID passed in comes # from the batch_id in the "insertion" event from the previous batch. last_event_in_batch = events_to_create[-1] - batch_id_to_connect_to = batch_id_from_query base_insertion_event = None if batch_id_from_query: + batch_id_to_connect_to = batch_id_from_query # All but the first base insertion event should point at a fake # event, which causes the HS to ask for the state at the start of # the batch later. @@ -521,10 +177,12 @@ async def on_POST( else: prev_event_ids = prev_event_ids_from_query - base_insertion_event_dict = self._create_insertion_event_dict( - sender=requester.user.to_string(), - room_id=room_id, - origin_server_ts=last_event_in_batch["origin_server_ts"], + base_insertion_event_dict = ( + self.room_batch_handler.create_insertion_event_dict( + sender=requester.user.to_string(), + room_id=room_id, + origin_server_ts=last_event_in_batch["origin_server_ts"], + ) ) base_insertion_event_dict["prev_events"] = prev_event_ids.copy() @@ -532,7 +190,7 @@ async def on_POST( base_insertion_event, _, ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self._create_requester_for_user_id_from_app_service( + await self.room_batch_handler.create_requester_for_user_id_from_app_service( base_insertion_event_dict["sender"], requester.app_service, ), @@ -549,7 +207,7 @@ async def on_POST( # Create and persist all of the historical events as well as insertion # and batch meta events to make the batch navigable in the DAG. - event_ids, next_batch_id = await self._handleBatchOfEvents( + event_ids, next_batch_id = await self.room_batch_handler.handleBatchOfEvents( events_to_create=events_to_create, room_id=room_id, batch_id_to_connect_to=batch_id_to_connect_to, diff --git a/synapse/server.py b/synapse/server.py index 637eb15b786d..b45c973bc4f1 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -97,6 +97,7 @@ RoomCreationHandler, RoomShutdownHandler, ) +from synapse.handlers.room_batch import RoomBatchHandler from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHandler from synapse.handlers.room_member_worker import RoomMemberWorkerHandler @@ -437,6 +438,10 @@ def get_federation_http_client(self) -> MatrixFederationHttpClient: def get_room_creation_handler(self) -> RoomCreationHandler: return RoomCreationHandler(self) + @cache_in_self + def get_room_batch_handler(self) -> RoomBatchHandler: + return RoomBatchHandler(self) + @cache_in_self def get_room_shutdown_handler(self) -> RoomShutdownHandler: return RoomShutdownHandler(self) From d77a5f6e6ff0d69f1df5275699722f1f262cea5a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 1 Oct 2021 21:05:57 -0500 Subject: [PATCH 4/7] Add changelog: --- changelog.d/10974.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10974.misc diff --git a/changelog.d/10974.misc b/changelog.d/10974.misc new file mode 100644 index 000000000000..8695b378aabb --- /dev/null +++ b/changelog.d/10974.misc @@ -0,0 +1 @@ +Refactor [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) `/batch_send` mega function into smaller handler functions. From ee1e24e42d3c761e1f51a136710c139ffb0915d7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 1 Oct 2021 21:36:10 -0500 Subject: [PATCH 5/7] Fix lint --- synapse/handlers/room_batch.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 8fb665004e85..797862bd0f98 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -1,11 +1,9 @@ import logging from typing import TYPE_CHECKING, List, Tuple -from synapse.http.servlet import ( - assert_params_in_dict, -) from synapse.api.constants import EventContentFields, EventTypes from synapse.appservice import ApplicationService +from synapse.http.servlet import assert_params_in_dict from synapse.types import JsonDict, Requester, UserID, create_requester from synapse.util.stringutils import random_string From 3ccb2dd5917a90ea5b3480adc5a4310111aedaa4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Oct 2021 18:42:35 -0500 Subject: [PATCH 6/7] Use snake_case for functions --- synapse/handlers/room_batch.py | 10 +++++----- synapse/rest/client/room_batch.py | 10 ++++------ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 797862bd0f98..e25491de3fe9 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -112,7 +112,7 @@ async def create_requester_for_user_id_from_app_service( return create_requester(user_id, app_service=app_service) - async def getMostRecentAuthEventIdsFromEventIdList( + async def get_most_recent_auth_event_ids_from_event_id_list( self, event_ids: List[str] ) -> List[str]: """Find the most recent auth event ids (derived from state events) that @@ -134,7 +134,7 @@ async def getMostRecentAuthEventIdsFromEventIdList( return auth_event_ids - async def persistStateEventsAtStart( + async def persist_state_events_at_start( self, state_events_at_start: List[JsonDict], room_id: str, @@ -224,7 +224,7 @@ async def persistStateEventsAtStart( return state_event_ids_at_start - async def persistHistoricalEvents( + async def persist_historical_events( self, events_to_create: List[JsonDict], room_id: str, @@ -306,7 +306,7 @@ async def persistHistoricalEvents( return event_ids - async def handleBatchOfEvents( + async def handle_batch_of_events( self, events_to_create: List[JsonDict], room_id: str, @@ -359,7 +359,7 @@ async def handleBatchOfEvents( events_to_create = [insertion_event] + events_to_create # Create and persist all of the historical events - event_ids = await self.persistHistoricalEvents( + event_ids = await self.persist_historical_events( events_to_create=events_to_create, room_id=room_id, initial_prev_event_ids=initial_prev_event_ids, diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index d3cae3f2ff33..a3600685db65 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -128,17 +128,15 @@ async def on_POST( # find the most recent auth events (derived from state events) that # allowed that message to be sent. We will use that as a base # to auth our historical messages against. - auth_event_ids = ( - await self.room_batch_handler.getMostRecentAuthEventIdsFromEventIdList( - prev_event_ids_from_query - ) + auth_event_ids = await self.room_batch_handler.get_most_recent_auth_event_ids_from_event_id_list( + prev_event_ids_from_query ) # Create and persist all of the state events that float off on their own # before the batch. These will most likely be all of the invite/member # state events used to auth the upcoming historical messages. state_event_ids_at_start = ( - await self.room_batch_handler.persistStateEventsAtStart( + await self.room_batch_handler.persist_state_events_at_start( state_events_at_start=body["state_events_at_start"], room_id=room_id, initial_auth_event_ids=auth_event_ids, @@ -207,7 +205,7 @@ async def on_POST( # Create and persist all of the historical events as well as insertion # and batch meta events to make the batch navigable in the DAG. - event_ids, next_batch_id = await self.room_batch_handler.handleBatchOfEvents( + event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events( events_to_create=events_to_create, room_id=room_id, batch_id_to_connect_to=batch_id_to_connect_to, From 9d4dd5c10ca2ed2b6bfb40dd7f2b266861e6dacb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Oct 2021 19:16:00 -0500 Subject: [PATCH 7/7] Add docstrings to all batch_send functions --- synapse/handlers/room_batch.py | 82 +++++++++++++++++++++++++------ synapse/rest/client/room_batch.py | 4 +- 2 files changed, 69 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index e25491de3fe9..51dd4e755570 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -14,8 +14,6 @@ class RoomBatchHandler: - """Contains some read only APIs to get state about a room""" - def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastore() @@ -25,6 +23,17 @@ def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() async def inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int: + """Finds the depth which would sort it after the most-recent + prev_event_id but before the successors of those events. If no + successors are found, we assume it's an historical extremity part of the + current batch and use the same depth of the prev_event_ids. + + Args: + prev_event_ids: List of prev event IDs + + Returns: + Inherited depth + """ ( most_recent_prev_event_id, most_recent_prev_event_depth, @@ -116,8 +125,14 @@ async def get_most_recent_auth_event_ids_from_event_id_list( self, event_ids: List[str] ) -> List[str]: """Find the most recent auth event ids (derived from state events) that - allowed that message to be sent. We will use that as a base + allowed that message to be sent. We will use this as a base to auth our historical messages against. + + Args: + event_ids: List of event ID's to look at + + Returns: + List of event ID's """ ( @@ -139,17 +154,26 @@ async def persist_state_events_at_start( state_events_at_start: List[JsonDict], room_id: str, initial_auth_event_ids: List[str], - requester: Requester, + app_service_requester: Requester, ) -> List[str]: """Takes all `state_events_at_start` event dictionaries and creates/persists them as floating state events which don't resolve into the current room state. They are floating because they reference a fake prev_event which doesn't connect to the normal DAG at all. + Args: + state_events_at_start: + room_id: Room where you want the events persisted in. + initial_auth_event_ids: These will be the auth_events for the first + state event created. Each event created afterwards will be + added to the list of auth events for the next state event + created. + app_service_requester: The requester of an application service. + Returns: List of state event ID's we just persisted """ - assert requester.app_service + assert app_service_requester.app_service state_event_ids_at_start = [] auth_event_ids = initial_auth_event_ids.copy() @@ -185,7 +209,7 @@ async def persist_state_events_at_start( membership = event_dict["content"].get("membership", None) event_id, _ = await self.room_member_handler.update_membership( await self.create_requester_for_user_id_from_app_service( - state_event["sender"], requester.app_service + state_event["sender"], app_service_requester.app_service ), target=UserID.from_string(event_dict["state_key"]), room_id=room_id, @@ -207,7 +231,7 @@ async def persist_state_events_at_start( _, ) = await self.event_creation_handler.create_and_send_nonmember_event( await self.create_requester_for_user_id_from_app_service( - state_event["sender"], requester.app_service + state_event["sender"], app_service_requester.app_service ), event_dict, outlier=True, @@ -231,7 +255,7 @@ async def persist_historical_events( initial_prev_event_ids: List[str], inherited_depth: int, auth_event_ids: List[str], - requester: Requester, + app_service_requester: Requester, ) -> List[str]: """Create and persists all events provided sequentially. Handles the complexity of creating events in chronological order so they can @@ -240,10 +264,23 @@ async def persist_historical_events( (topological_ordering, stream_ordering) and sort correctly from /messages. + Args: + events_to_create: List of historical events to create in JSON + dictionary format. + room_id: Room where you want the events persisted in. + initial_prev_event_ids: These will be the prev_events for the first + event created. Each event created afterwards will point to the + previous event created. + inherited_depth: The depth to create the events at (you will + probably by calling inherit_depth_from_prev_ids(...)). + auth_event_ids: Define which events allow you to create the given + event in the room. + app_service_requester: The requester of an application service. + Returns: List of persisted event IDs """ - assert requester.app_service + assert app_service_requester.app_service prev_event_ids = initial_prev_event_ids.copy() @@ -266,7 +303,7 @@ async def persist_historical_events( event, context = await self.event_creation_handler.create_event( await self.create_requester_for_user_id_from_app_service( - ev["sender"], requester.app_service + ev["sender"], app_service_requester.app_service ), event_dict, prev_event_ids=event_dict.get("prev_events"), @@ -298,7 +335,7 @@ async def persist_historical_events( for (event, context) in reversed(events_to_persist): await self.event_creation_handler.handle_new_client_event( await self.create_requester_for_user_id_from_app_service( - event["sender"], requester.app_service + event["sender"], app_service_requester.app_service ), event=event, context=context, @@ -314,12 +351,27 @@ async def handle_batch_of_events( initial_prev_event_ids: List[str], inherited_depth: int, auth_event_ids: List[str], - requester: Requester, + app_service_requester: Requester, ) -> Tuple[List[str], str]: """ Handles creating and persisting all of the historical events as well as insertion and batch meta events to make the batch navigable in the DAG. + Args: + events_to_create: List of historical events to create in JSON + dictionary format. + room_id: Room where you want the events created in. + batch_id_to_connect_to: The batch_id from the insertion event you + want this batch to connect to. + initial_prev_event_ids: These will be the prev_events for the first + event created. Each event created afterwards will point to the + previous event created. + inherited_depth: The depth to create the events at (you will + probably by calling inherit_depth_from_prev_ids(...)). + auth_event_ids: Define which events allow you to create the given + event in the room. + app_service_requester: The requester of an application service. + Returns: Tuple containing a list of created events and the next_batch_id """ @@ -328,7 +380,7 @@ async def handle_batch_of_events( last_event_in_batch = events_to_create[-1] batch_event = { "type": EventTypes.MSC2716_BATCH, - "sender": requester.user.to_string(), + "sender": app_service_requester.user.to_string(), "room_id": room_id, "content": { EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to, @@ -345,7 +397,7 @@ async def handle_batch_of_events( # Add an "insertion" event to the start of each batch (next to the oldest-in-time # event in the batch) so the next batch can be connected to this one. insertion_event = self.create_insertion_event_dict( - sender=requester.user.to_string(), + sender=app_service_requester.user.to_string(), room_id=room_id, # Since the insertion event is put at the start of the batch, # where the oldest-in-time event is, copy the origin_server_ts from @@ -365,7 +417,7 @@ async def handle_batch_of_events( initial_prev_event_ids=initial_prev_event_ids, inherited_depth=inherited_depth, auth_event_ids=auth_event_ids, - requester=requester, + app_service_requester=app_service_requester, ) return event_ids, next_batch_id diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index a3600685db65..38ad4c24475b 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -140,7 +140,7 @@ async def on_POST( state_events_at_start=body["state_events_at_start"], room_id=room_id, initial_auth_event_ids=auth_event_ids, - requester=requester, + app_service_requester=requester, ) ) # Update our ongoing auth event ID list with all of the new state we @@ -212,7 +212,7 @@ async def on_POST( initial_prev_event_ids=prev_event_ids, inherited_depth=inherited_depth, auth_event_ids=auth_event_ids, - requester=requester, + app_service_requester=requester, ) insertion_event_id = event_ids[0]