Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove some unused distributor signals (#8216)
Browse files Browse the repository at this point in the history
Removes the `user_joined_room` and stops calling it since there are no observers.

Also cleans-up some other unused signals and related code.
  • Loading branch information
clokep authored Sep 9, 2020
1 parent c9dbee5 commit 2ea1c68
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 141 deletions.
1 change: 1 addition & 0 deletions changelog.d/8216.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Simplify the distributor code to avoid unnecessary work.
4 changes: 0 additions & 4 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ class EventStreamHandler(BaseHandler):
def __init__(self, hs: "HomeServer"):
super(EventStreamHandler, self).__init__(hs)

self.distributor = hs.get_distributor()
self.distributor.declare("started_user_eventstream")
self.distributor.declare("stopped_user_eventstream")

self.clock = hs.get_clock()

self.notifier = hs.get_notifier()
Expand Down
43 changes: 1 addition & 42 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
Expand All @@ -80,7 +79,6 @@
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
Expand Down Expand Up @@ -141,9 +139,6 @@ def __init__(self, hs):
self._replication = hs.get_replication_data_handler()

self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
hs
)
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
)
Expand Down Expand Up @@ -704,31 +699,10 @@ async def _process_received_pdu(
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)

try:
context = await self._handle_new_event(origin, event, state=state)
await self._handle_new_event(origin, event, state=state)
except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)

if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True

prev_state_ids = await context.get_prev_state_ids()

prev_state_id = prev_state_ids.get((event.type, event.state_key))
if prev_state_id:
prev_state = await self.store.get_event(
prev_state_id, allow_none=True
)
if prev_state and prev_state.membership == Membership.JOIN:
newly_joined = False

if newly_joined:
user = UserID.from_string(event.state_key)
await self.user_joined_room(user, room_id)

# For encrypted messages we check that we know about the sending device,
# if we don't then we mark the device cache for that user as stale.
if event.type == EventTypes.Encrypted:
Expand Down Expand Up @@ -1550,11 +1524,6 @@ async def on_send_join_request(self, origin, pdu):
event.signatures,
)

if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
await self.user_joined_room(user, event.room_id)

prev_state_ids = await context.get_prev_state_ids()

state_ids = list(prev_state_ids.values())
Expand Down Expand Up @@ -2984,16 +2953,6 @@ async def _clean_room_for_join(self, room_id: str) -> None:
else:
await self.store.clean_room_for_join(room_id)

async def user_joined_room(self, user: UserID, room_id: str) -> None:
"""Called when a new user has joined the room
"""
if self.config.worker_app:
await self._notify_user_membership_change(
room_id=room_id, user_id=user.to_string(), change="joined"
)
else:
user_joined_room(self.distributor, user, room_id)

async def get_room_complexity(
self, remote_room_hosts: List[str], room_id: str
) -> Optional[dict]:
Expand Down
42 changes: 4 additions & 38 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
from synapse.util.distributor import user_left_room

from ._base import BaseHandler

Expand Down Expand Up @@ -148,17 +148,6 @@ async def remote_reject_invite(
"""
raise NotImplementedError()

@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
room.
Args:
target
room_id
"""
raise NotImplementedError()

@abc.abstractmethod
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has left the
Expand Down Expand Up @@ -221,7 +210,6 @@ async def _local_membership_update(

prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)

newly_joined = False
if event.membership == Membership.JOIN:
newly_joined = True
if prev_member_event_id:
Expand All @@ -246,12 +234,7 @@ async def _local_membership_update(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)

if event.membership == Membership.JOIN and newly_joined:
# Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
await self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE:
if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
Expand Down Expand Up @@ -726,17 +709,7 @@ async def send_membership_event(
(EventTypes.Member, event.state_key), None
)

if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
newly_joined = True
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
await self._user_joined_room(target_user, room_id)
elif event.membership == Membership.LEAVE:
if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
Expand Down Expand Up @@ -1002,10 +975,9 @@ async def _is_server_notice_room(self, room_id: str) -> bool:

class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
super(RoomMemberMasterHandler, self).__init__(hs)
super().__init__(hs)

self.distributor = hs.get_distributor()
self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room")

async def _is_remote_room_too_complex(
Expand Down Expand Up @@ -1085,7 +1057,6 @@ async def _remote_join(
event_id, stream_id = await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user.to_string(), content
)
await self._user_joined_room(user, room_id)

# Check the room we just joined wasn't too large, if we didn't fetch the
# complexity of it before.
Expand Down Expand Up @@ -1228,11 +1199,6 @@ async def _locally_reject_invite(
)
return event.event_id, stream_id

async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
"""
user_joined_room(self.distributor, target, room_id)

async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
Expand Down
9 changes: 0 additions & 9 deletions synapse/handlers/room_member_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ async def _remote_join(
content=content,
)

await self._user_joined_room(user, room_id)

return ret["event_id"], ret["stream_id"]

async def remote_reject_invite(
Expand All @@ -81,13 +79,6 @@ async def remote_reject_invite(
)
return ret["event_id"], ret["stream_id"]

async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
"""
await self._notify_change_client(
user_id=target.to_string(), room_id=room_id, change="joined"
)

async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
Expand Down
10 changes: 4 additions & 6 deletions synapse/replication/http/membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict, Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room
from synapse.util.distributor import user_left_room

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -181,9 +181,9 @@ async def _serialize_payload(room_id, user_id, change):
Args:
room_id (str)
user_id (str)
change (str): Either "joined" or "left"
change (str): "left"
"""
assert change in ("joined", "left")
assert change == "left"

return {}

Expand All @@ -192,9 +192,7 @@ def _handle_request(self, request, room_id, user_id, change):

user = UserID.from_string(user_id)

if change == "joined":
user_joined_room(self.distributor, user, room_id)
elif change == "left":
if change == "left":
user_left_room(self.distributor, user, room_id)
else:
raise Exception("Unrecognized change: %r", change)
Expand Down
50 changes: 8 additions & 42 deletions synapse/util/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import logging

from twisted.internet import defer
from twisted.internet.defer import Deferred, fail, succeed
from twisted.python import failure

from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
Expand All @@ -29,11 +27,6 @@ def user_left_room(distributor, user, room_id):
distributor.fire("user_left_room", user=user, room_id=room_id)


# XXX: this is no longer used. We should probably kill it.
def user_joined_room(distributor, user, room_id):
distributor.fire("user_joined_room", user=user, room_id=room_id)


class Distributor:
"""A central dispatch point for loosely-connected pieces of code to
register, observe, and fire signals.
Expand Down Expand Up @@ -81,28 +74,6 @@ def fire(self, name, *args, **kwargs):
run_as_background_process(name, self.signals[name].fire, *args, **kwargs)


def maybeAwaitableDeferred(f, *args, **kw):
"""
Invoke a function that may or may not return a Deferred or an Awaitable.
This is a modified version of twisted.internet.defer.maybeDeferred.
"""
try:
result = f(*args, **kw)
except Exception:
return fail(failure.Failure(captureVars=Deferred.debug))

if isinstance(result, Deferred):
return result
# Handle the additional case of an awaitable being returned.
elif inspect.isawaitable(result):
return defer.ensureDeferred(result)
elif isinstance(result, failure.Failure):
return fail(result)
else:
return succeed(result)


class Signal:
"""A Signal is a dispatch point that stores a list of callables as
observers of it.
Expand Down Expand Up @@ -132,22 +103,17 @@ def fire(self, *args, **kwargs):
Returns a Deferred that will complete when all the observers have
completed."""

def do(observer):
def eb(failure):
async def do(observer):
try:
result = observer(*args, **kwargs)
if inspect.isawaitable(result):
result = await result
return result
except Exception as e:
logger.warning(
"%s signal observer %s failed: %r",
self.name,
observer,
failure,
exc_info=(
failure.type,
failure.value,
failure.getTracebackObject(),
),
"%s signal observer %s failed: %r", self.name, observer, e,
)

return maybeAwaitableDeferred(observer, *args, **kwargs).addErrback(eb)

deferreds = [run_in_background(do, o) for o in self.observers]

return make_deferred_yieldable(
Expand Down

0 comments on commit 2ea1c68

Please sign in to comment.