Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Update the room member handler to use async/await. #7507

Merged
merged 2 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7507.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert the room member handler to async/await.
111 changes: 49 additions & 62 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

from six.moves import http_client

from twisted.internet import defer

from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
Expand Down Expand Up @@ -76,7 +74,7 @@ def __init__(self, hs):
self.base_handler = BaseHandler(hs)

@abc.abstractmethod
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
async def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Try and join a room that this server is not in

Args:
Expand All @@ -94,7 +92,7 @@ def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
raise NotImplementedError()

@abc.abstractmethod
def _remote_reject_invite(
async def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
"""Attempt to reject an invite for a room this server is not in. If we
Expand All @@ -115,7 +113,7 @@ def _remote_reject_invite(
raise NotImplementedError()

@abc.abstractmethod
def _user_joined_room(self, target, room_id):
async def _user_joined_room(self, target, room_id):
"""Notifies distributor on master process that the user has joined the
room.

Expand All @@ -124,12 +122,12 @@ def _user_joined_room(self, target, room_id):
room_id (str)

Returns:
Deferred|None
None
"""
raise NotImplementedError()

@abc.abstractmethod
def _user_left_room(self, target, room_id):
async def _user_left_room(self, target, room_id):
"""Notifies distributor on master process that the user has left the
room.

Expand All @@ -138,7 +136,7 @@ def _user_left_room(self, target, room_id):
room_id (str)

Returns:
Deferred|None
None
"""
raise NotImplementedError()

Expand Down Expand Up @@ -214,8 +212,9 @@ async def _local_membership_update(

return event

@defer.inlineCallbacks
def copy_room_tags_and_direct_to_room(self, old_room_id, new_room_id, user_id):
async def copy_room_tags_and_direct_to_room(
self, old_room_id, new_room_id, user_id
):
"""Copies the tags and direct room state from one room to another.

Args:
Expand All @@ -227,7 +226,7 @@ def copy_room_tags_and_direct_to_room(self, old_room_id, new_room_id, user_id):
Deferred[None]
"""
# Retrieve user account data for predecessor room
user_account_data, _ = yield self.store.get_account_data_for_user(user_id)
user_account_data, _ = await self.store.get_account_data_for_user(user_id)

# Copy direct message state if applicable
direct_rooms = user_account_data.get("m.direct", {})
Expand All @@ -240,17 +239,17 @@ def copy_room_tags_and_direct_to_room(self, old_room_id, new_room_id, user_id):
direct_rooms[key].append(new_room_id)

# Save back to user's m.direct account data
yield self.store.add_account_data_for_user(
await self.store.add_account_data_for_user(
user_id, "m.direct", direct_rooms
)
break

# Copy room tags if applicable
room_tags = yield self.store.get_tags_for_room(user_id, old_room_id)
room_tags = await self.store.get_tags_for_room(user_id, old_room_id)

# Copy each room tag to the new room
for tag, tag_content in room_tags.items():
yield self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content)
await self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content)

async def update_membership(
self,
Expand Down Expand Up @@ -487,8 +486,7 @@ async def _update_membership(
)
return res

@defer.inlineCallbacks
def transfer_room_state_on_room_upgrade(self, old_room_id, room_id):
async def transfer_room_state_on_room_upgrade(self, old_room_id, room_id):
"""Upon our server becoming aware of an upgraded room, either by upgrading a room
ourselves or joining one, we can transfer over information from the previous room.

Expand All @@ -506,30 +504,29 @@ def transfer_room_state_on_room_upgrade(self, old_room_id, room_id):
logger.info("Transferring room state from %s to %s", old_room_id, room_id)

# Find all local users that were in the old room and copy over each user's state
users = yield self.store.get_users_in_room(old_room_id)
yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
users = await self.store.get_users_in_room(old_room_id)
await self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)

# Add new room to the room directory if the old room was there
# Remove old room from the room directory
old_room = yield self.store.get_room(old_room_id)
old_room = await self.store.get_room(old_room_id)
if old_room and old_room["is_public"]:
yield self.store.set_room_is_public(old_room_id, False)
yield self.store.set_room_is_public(room_id, True)
await self.store.set_room_is_public(old_room_id, False)
await self.store.set_room_is_public(room_id, True)

# Transfer alias mappings in the room directory
yield self.store.update_aliases_for_room(old_room_id, room_id)
await self.store.update_aliases_for_room(old_room_id, room_id)

# Check if any groups we own contain the predecessor room
local_group_ids = yield self.store.get_local_groups_for_room(old_room_id)
local_group_ids = await self.store.get_local_groups_for_room(old_room_id)
for group_id in local_group_ids:
# Add new the new room to those groups
yield self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
await self.store.add_room_to_group(group_id, room_id, old_room["is_public"])

# Remove the old room from those groups
yield self.store.remove_room_from_group(group_id, old_room_id)
await self.store.remove_room_from_group(group_id, old_room_id)

@defer.inlineCallbacks
def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
async def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
"""Copy user-specific information when they join a new room when that new room is the
result of a room upgrade

Expand All @@ -552,11 +549,11 @@ def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
for user_id in user_ids:
try:
# It is an upgraded room. Copy over old tags
yield self.copy_room_tags_and_direct_to_room(
await self.copy_room_tags_and_direct_to_room(
old_room_id, new_room_id, user_id
)
# Copy over push rules
yield self.store.copy_push_rules_from_room_to_room_for_user(
await self.store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id
)
except Exception:
Expand Down Expand Up @@ -639,16 +636,15 @@ async def send_membership_event(self, requester, event, context, ratelimit=True)
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target_user, room_id)

@defer.inlineCallbacks
def _can_guest_join(self, current_state_ids):
async def _can_guest_join(self, current_state_ids):
"""
Returns whether a guest can join a room based on its current state.
"""
guest_access_id = current_state_ids.get((EventTypes.GuestAccess, ""), None)
if not guest_access_id:
return False

guest_access = yield self.store.get_event(guest_access_id)
guest_access = await self.store.get_event(guest_access_id)

return (
guest_access
Expand All @@ -657,8 +653,7 @@ def _can_guest_join(self, current_state_ids):
and guest_access.content["guest_access"] == "can_join"
)

@defer.inlineCallbacks
def lookup_room_alias(self, room_alias):
async def lookup_room_alias(self, room_alias):
"""
Get the room ID associated with a room alias.

Expand All @@ -672,7 +667,7 @@ def lookup_room_alias(self, room_alias):
SynapseError if room alias could not be found.
"""
directory_handler = self.directory_handler
mapping = yield directory_handler.get_association(room_alias)
mapping = await directory_handler.get_association(room_alias)

if not mapping:
raise SynapseError(404, "No such room alias")
Expand All @@ -687,9 +682,8 @@ def lookup_room_alias(self, room_alias):

return RoomID.from_string(room_id), servers

@defer.inlineCallbacks
def _get_inviter(self, user_id, room_id):
invite = yield self.store.get_invite_for_local_user_in_room(
async def _get_inviter(self, user_id, room_id):
invite = await self.store.get_invite_for_local_user_in_room(
user_id=user_id, room_id=room_id
)
if invite:
Expand Down Expand Up @@ -836,8 +830,7 @@ async def _make_and_store_3pid_invite(
txn_id=txn_id,
)

@defer.inlineCallbacks
def _is_host_in_room(self, current_state_ids):
async def _is_host_in_room(self, current_state_ids):
# Have we just created the room, and is this about to be the very
# first member event?
create_event_id = current_state_ids.get(("m.room.create", ""))
Expand All @@ -850,7 +843,7 @@ def _is_host_in_room(self, current_state_ids):
continue

event_id = current_state_ids[(etype, state_key)]
event = yield self.store.get_event(event_id, allow_none=True)
event = await self.store.get_event(event_id, allow_none=True)
if not event:
continue

Expand All @@ -859,11 +852,10 @@ def _is_host_in_room(self, current_state_ids):

return False

@defer.inlineCallbacks
def _is_server_notice_room(self, room_id):
async def _is_server_notice_room(self, room_id):
if self._server_notices_mxid is None:
return False
user_ids = yield self.store.get_users_in_room(room_id)
user_ids = await self.store.get_users_in_room(room_id)
return self._server_notices_mxid in user_ids


Expand Down Expand Up @@ -895,8 +887,7 @@ async def _is_remote_room_too_complex(self, room_id, remote_room_hosts):
return complexity["v1"] > max_complexity
return None

@defer.inlineCallbacks
def _is_local_room_too_complex(self, room_id):
async def _is_local_room_too_complex(self, room_id):
"""
Check if the complexity of a local room is too great.

Expand All @@ -906,7 +897,7 @@ def _is_local_room_too_complex(self, room_id):
Returns: bool
"""
max_complexity = self.hs.config.limit_remote_rooms.complexity
complexity = yield self.store.get_room_complexity(room_id)
complexity = await self.store.get_room_complexity(room_id)

return complexity["v1"] > max_complexity

Expand Down Expand Up @@ -969,18 +960,15 @@ async def _remote_join(self, requester, remote_room_hosts, room_id, user, conten
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
)

@defer.inlineCallbacks
def _remote_reject_invite(
async def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
"""Implements RoomMemberHandler._remote_reject_invite
"""
fed_handler = self.federation_handler
try:
ret = yield defer.ensureDeferred(
fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string(), content=content,
)
ret = await fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string(), content=content,
)
return ret
except Exception as e:
Expand All @@ -992,24 +980,23 @@ def _remote_reject_invite(
#
logger.warning("Failed to reject invite: %s", e)

yield self.store.locally_reject_invite(target.to_string(), room_id)
await self.store.locally_reject_invite(target.to_string(), room_id)
return {}

def _user_joined_room(self, target, room_id):
async def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room
"""
return defer.succeed(user_joined_room(self.distributor, target, room_id))
return user_joined_room(self.distributor, target, room_id)

def _user_left_room(self, target, room_id):
async def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room
"""
return defer.succeed(user_left_room(self.distributor, target, room_id))
return user_left_room(self.distributor, target, room_id)

@defer.inlineCallbacks
def forget(self, user, room_id):
async def forget(self, user, room_id):
user_id = user.to_string()

member = yield self.state_handler.get_current_state(
member = await self.state_handler.get_current_state(
room_id=room_id, event_type=EventTypes.Member, state_key=user_id
)
membership = member.membership if member else None
Expand All @@ -1021,4 +1008,4 @@ def forget(self, user, room_id):
raise SynapseError(400, "User %s in room %s" % (user_id, room_id))

if membership:
yield self.store.forget(user_id, room_id)
await self.store.forget(user_id, room_id)
21 changes: 9 additions & 12 deletions synapse/handlers/room_member_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler
from synapse.replication.http.membership import (
Expand All @@ -36,48 +34,47 @@ def __init__(self, hs):
self._remote_reject_client = ReplRejectInvite.make_client(hs)
self._notify_change_client = ReplJoinedLeft.make_client(hs)

@defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
async def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
"""
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")

ret = yield self._remote_join_client(
ret = await self._remote_join_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
user_id=user.to_string(),
content=content,
)

yield self._user_joined_room(user, room_id)
await self._user_joined_room(user, room_id)

return ret

def _remote_reject_invite(
async def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
"""Implements RoomMemberHandler._remote_reject_invite
"""
return self._remote_reject_client(
return await self._remote_reject_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
user_id=target.to_string(),
content=content,
)

def _user_joined_room(self, target, room_id):
async def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room
"""
return self._notify_change_client(
return await self._notify_change_client(
user_id=target.to_string(), room_id=room_id, change="joined"
)

def _user_left_room(self, target, room_id):
async def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room
"""
return self._notify_change_client(
return await self._notify_change_client(
user_id=target.to_string(), room_id=room_id, change="left"
)