Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster joins: Fix incompatibility with restricted joins #14882

Merged
merged 6 commits into from
Jan 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,11 @@ async def _execute(pdu: EventBase) -> None:
"members_omitted was set, but no servers were listed in the room"
)

if response.members_omitted and not partial_state:
raise InvalidResponseError(
"members_omitted was set, but we asked for full state"
)

return SendJoinResult(
event=event,
state=signed_state,
Expand Down
207 changes: 125 additions & 82 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
FederationError,
FederationPullAttemptBackoffError,
HttpResponseException,
LimitExceededError,
NotFoundError,
RequestSendFailed,
SynapseError,
Expand Down Expand Up @@ -182,6 +181,12 @@ def __init__(self, hs: "HomeServer"):
self._partial_state_syncs_maybe_needing_restart: Dict[
str, Tuple[Optional[str], Collection[str]]
] = {}
# A lock guarding the partial state flag for rooms.
# When the lock is held for a given room, no other concurrent code may
# partial state or un-partial state the room.
self._is_partial_state_room_linearizer = Linearizer(
name="_is_partial_state_room_linearizer"
)

# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
Expand Down Expand Up @@ -599,7 +604,23 @@ async def do_invite_join(

self._federation_event_handler.room_queues[room_id] = []

await self._clean_room_for_join(room_id)
is_host_joined = await self.store.is_host_joined(room_id, self.server_name)

if not is_host_joined:
# We may have old forward extremities lying around if the homeserver left
# the room completely in the past. Clear them out.
#
# Note that this check-then-clear is subject to races where
# * the homeserver is in the room and stops being in the room just after
# the check. We won't reset the forward extremities, but that's okay,
# since they will be almost up to date.
# * the homeserver is not in the room and starts being in the room just
# after the check. This can't happen, since `RoomMemberHandler` has a
# linearizer lock which prevents concurrent remote joins into the same
# room.
# In short, the races either have an acceptable outcome or should be
# impossible.
await self._clean_room_for_join(room_id)

try:
# Try the host we successfully got a response to /make_join/
Expand All @@ -611,91 +632,106 @@ async def do_invite_join(
except ValueError:
pass

ret = await self.federation_client.send_join(
host_list, event, room_version_obj
)

event = ret.event
origin = ret.origin
state = ret.state
auth_chain = ret.auth_chain
auth_chain.sort(key=lambda e: e.depth)

logger.debug("do_invite_join auth_chain: %s", auth_chain)
logger.debug("do_invite_join state: %s", state)

logger.debug("do_invite_join event: %s", event)

# if this is the first time we've joined this room, it's time to add
# a row to `rooms` with the correct room version. If there's already a
# row there, we should override it, since it may have been populated
# based on an invite request which lied about the room version.
#
# federation_client.send_join has already checked that the room
# version in the received create event is the same as room_version_obj,
# so we can rely on it now.
#
await self.store.upsert_room_on_join(
room_id=room_id,
room_version=room_version_obj,
state_events=state,
)
async with self._is_partial_state_room_linearizer.queue(room_id):
ret = await self.federation_client.send_join(
host_list,
event,
room_version_obj,
# Perform a full join when we are already in the room and it is a
# full state room, since we are not allowed to persist a partial
# state join event in a full state room. In the future, we could
# optimize this by always performing a partial state join and
# computing the state ourselves or retrieving it from the remote
# homeserver if necessary.
#
# There's a race where we leave the room, then perform a full join
# anyway. This should end up being fast anyway, since we would
# already have the full room state and auth chain persisted.
partial_state=(
not is_host_joined
or await self.store.is_partial_state_room(room_id)
),
)

if ret.partial_state:
# Mark the room as having partial state.
# The background process is responsible for unmarking this flag,
# even if the join fails.
await self.store.store_partial_state_room(
event = ret.event
origin = ret.origin
state = ret.state
auth_chain = ret.auth_chain
auth_chain.sort(key=lambda e: e.depth)

logger.debug("do_invite_join auth_chain: %s", auth_chain)
logger.debug("do_invite_join state: %s", state)

logger.debug("do_invite_join event: %s", event)

# if this is the first time we've joined this room, it's time to add
# a row to `rooms` with the correct room version. If there's already a
# row there, we should override it, since it may have been populated
# based on an invite request which lied about the room version.
#
# federation_client.send_join has already checked that the room
# version in the received create event is the same as room_version_obj,
# so we can rely on it now.
#
await self.store.upsert_room_on_join(
room_id=room_id,
servers=ret.servers_in_room,
device_lists_stream_id=self.store.get_device_stream_token(),
joined_via=origin,
room_version=room_version_obj,
state_events=state,
)

try:
max_stream_id = (
await self._federation_event_handler.process_remote_join(
origin,
room_id,
auth_chain,
state,
event,
room_version_obj,
partial_state=ret.partial_state,
)
)
except PartialStateConflictError as e:
# The homeserver was already in the room and it is no longer partial
# stated. We ought to be doing a local join instead. Turn the error into
# a 429, as a hint to the client to try again.
# TODO(faster_joins): `_should_perform_remote_join` suggests that we may
# do a remote join for restricted rooms even if we have full state.
logger.error(
"Room %s was un-partial stated while processing remote join.",
room_id,
)
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
else:
# Record the join event id for future use (when we finish the full
# join). We have to do this after persisting the event to keep foreign
# key constraints intact.
if ret.partial_state:
await self.store.write_partial_state_rooms_join_event_id(
room_id, event.event_id
)
finally:
# Always kick off the background process that asynchronously fetches
# state for the room.
# If the join failed, the background process is responsible for
# cleaning up — including unmarking the room as a partial state room.
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
# room.
self._start_partial_state_room_sync(
initial_destination=origin,
other_destinations=ret.servers_in_room,
# Mark the room as having partial state.
# The background process is responsible for unmarking this flag,
# even if the join fails.
await self.store.store_partial_state_room(
room_id=room_id,
servers=ret.servers_in_room,
device_lists_stream_id=self.store.get_device_stream_token(),
joined_via=origin,
)

try:
max_stream_id = (
await self._federation_event_handler.process_remote_join(
origin,
room_id,
auth_chain,
state,
event,
room_version_obj,
partial_state=ret.partial_state,
)
)
except PartialStateConflictError:
# This should be impossible, since we hold the lock on the room's
# partial statedness.
logger.error(
"Room %s was un-partial stated while processing remote join.",
room_id,
)
raise
else:
# Record the join event id for future use (when we finish the full
# join). We have to do this after persisting the event to keep
# foreign key constraints intact.
if ret.partial_state:
await self.store.write_partial_state_rooms_join_event_id(
room_id, event.event_id
)
finally:
# Always kick off the background process that asynchronously fetches
# state for the room.
# If the join failed, the background process is responsible for
# cleaning up — including unmarking the room as a partial state
# room.
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for
# this room.
self._start_partial_state_room_sync(
initial_destination=origin,
other_destinations=ret.servers_in_room,
room_id=room_id,
)

# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
Expand Down Expand Up @@ -1778,6 +1814,11 @@ async def _sync_partial_state_room(
`initial_destination` is unavailable
room_id: room to be resynced
"""
# Assume that we run on the main process for now.
# When moving the sync to workers, we need to ensure that
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
# * `_start_partial_state_room_sync` still prevents duplicate resyncs
# * `_is_partial_state_room_linearizer` correctly guards partial state flags
# for rooms between the workers doing remote joins and resync.
assert not self.config.worker.worker_app

# TODO(faster_joins): do we need to lock to avoid races? What happens if other
Expand Down Expand Up @@ -1815,8 +1856,10 @@ async def _sync_partial_state_room(
logger.info("Handling any pending device list updates")
await self._device_handler.handle_room_un_partial_stated(room_id)

logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
async with self._is_partial_state_room_linearizer.queue(room_id):
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)

if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
Expand Down