-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add presence federation stream #9819
Changes from 10 commits
25cb535
885c3d9
14b263d
63c8516
ee9a5ce
28ba928
2571eeb
e4ec78b
d429733
a20a442
091f5da
e36cce5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add experimental support for handling presence on a worker. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
import abc | ||
import contextlib | ||
import logging | ||
from bisect import bisect | ||
from contextlib import contextmanager | ||
from typing import ( | ||
TYPE_CHECKING, | ||
|
@@ -53,7 +54,9 @@ | |
ReplicationBumpPresenceActiveTime, | ||
ReplicationPresenceSetState, | ||
) | ||
from synapse.replication.http.streams import ReplicationGetStreamUpdates | ||
from synapse.replication.tcp.commands import ClearUserSyncsCommand | ||
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream | ||
from synapse.state import StateHandler | ||
from synapse.storage.databases.main import DataStore | ||
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id | ||
|
@@ -127,10 +130,10 @@ def __init__(self, hs: "HomeServer"): | |
self.state = hs.get_state_handler() | ||
|
||
self._federation = None | ||
if hs.should_send_federation() or not hs.config.worker_app: | ||
if hs.should_send_federation(): | ||
self._federation = hs.get_federation_sender() | ||
|
||
self._send_federation = hs.should_send_federation() | ||
self._federation_queue = PresenceFederationQueue(hs, self) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's not entirely obvious to me that having a separate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mainly just to bundle up all the logic into one place TBH. (In the first PoC we only instantiated this on the presence writer process, but that changed). |
||
|
||
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled | ||
|
||
|
@@ -253,9 +256,17 @@ async def update_external_syncs_clear(self, process_id): | |
""" | ||
pass | ||
|
||
async def process_replication_rows(self, token, rows): | ||
"""Process presence stream rows received over replication.""" | ||
pass | ||
async def process_replication_rows( | ||
self, stream_name: str, instance_name: str, token: int, rows: list | ||
): | ||
"""Process streams received over replication.""" | ||
await self._federation_queue.process_replication_rows( | ||
stream_name, instance_name, token, rows | ||
) | ||
|
||
def get_federation_queue(self) -> "PresenceFederationQueue": | ||
"""Get the presence federation queue.""" | ||
return self._federation_queue | ||
|
||
async def maybe_send_presence_to_interested_destinations( | ||
self, states: List[UserPresenceState] | ||
|
@@ -264,12 +275,9 @@ async def maybe_send_presence_to_interested_destinations( | |
destinations that are interested. | ||
""" | ||
|
||
if not self._send_federation: | ||
if not self._federation: | ||
return | ||
|
||
# If this worker sends federation we must have a FederationSender. | ||
assert self._federation | ||
|
||
hosts_and_states = await get_interested_remotes( | ||
self.store, | ||
self.presence_router, | ||
|
@@ -421,7 +429,14 @@ async def notify_from_replication(self, states, stream_id): | |
# If this is a federation sender, notify about presence updates. | ||
await self.maybe_send_presence_to_interested_destinations(states) | ||
|
||
async def process_replication_rows(self, token, rows): | ||
async def process_replication_rows( | ||
self, stream_name: str, instance_name: str, token: int, rows: list | ||
): | ||
await super().process_replication_rows(stream_name, instance_name, token, rows) | ||
|
||
if stream_name != PresenceStream.NAME: | ||
return | ||
|
||
states = [ | ||
UserPresenceState( | ||
row.user_id, | ||
|
@@ -724,12 +739,10 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None: | |
self.state, | ||
) | ||
|
||
# Since this is master we know that we have a federation sender or | ||
# queue, and so this will be defined. | ||
assert self._federation | ||
|
||
for destinations, states in hosts_and_states: | ||
self._federation.send_presence_to_destinations(states, destinations) | ||
self._federation_queue.send_presence_to_destinations( | ||
states, destinations | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @richvdh To continue the conversation from #9828 (comment), this is a similar shape as in |
||
|
||
async def _handle_timeouts(self): | ||
"""Checks the presence of users that have timed out and updates as | ||
|
@@ -1208,13 +1221,9 @@ async def _handle_state_delta(self, deltas): | |
user_presence_states | ||
) | ||
|
||
# Since this is master we know that we have a federation sender or | ||
# queue, and so this will be defined. | ||
assert self._federation | ||
|
||
# Send out user presence updates for each destination | ||
for destination, user_state_set in presence_destinations.items(): | ||
self._federation.send_presence_to_destinations( | ||
self._federation_queue.send_presence_to_destinations( | ||
destinations=[destination], states=user_state_set | ||
) | ||
|
||
|
@@ -1859,3 +1868,198 @@ async def get_interested_remotes( | |
hosts_and_states.append(([host], states)) | ||
|
||
return hosts_and_states | ||
|
||
|
||
class PresenceFederationQueue: | ||
"""Handles sending ad hoc presence updates over federation, which are *not* | ||
due to state updates (that get handled via the presence stream), e.g. | ||
federation pings and sending existing present states to newly joined hosts. | ||
|
||
Only the last N minutes will be queued, so if a federation sender instance | ||
is down for longer then some updates will be dropped. This is OK as presence | ||
is ephemeral, and so it will self correct eventually. | ||
|
||
On workers the class tracks the last received position of the stream from | ||
replication, and handles querying for missed updates over HTTP replication, | ||
c.f. `get_current_token` and `get_replication_rows`. | ||
""" | ||
|
||
# How long to keep entries in the queue for. Workers that are down for | ||
# longer than this duration will miss out on older updates. | ||
_KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000 | ||
|
||
# How often to check if we can expire entries from the queue. | ||
_CLEAR_ITEMS_EVERY_MS = 60 * 1000 | ||
|
||
def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler): | ||
self._clock = hs.get_clock() | ||
self._notifier = hs.get_notifier() | ||
self._instance_name = hs.get_instance_name() | ||
self._presence_handler = presence_handler | ||
self._repl_client = ReplicationGetStreamUpdates.make_client(hs) | ||
|
||
# Should we keep a queue of recent presence updates? We only bother if | ||
# another process may be handling federation sending. | ||
self._queue_presence_updates = True | ||
|
||
# Whether this instance is a presence writer. | ||
self._presence_writer = hs.config.worker.worker_app is None | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# The FederationSender instance, if this process sends federation traffic directly. | ||
self._federation = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is it called Small nitpick, resolve at will, I'm just pointing at the naming There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah wait, i see, it is an interface to send to federation through What is the actual type of this variable, though? Is there an interface type that describes this behaviour? (I'd like it to be annotated with it) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's an |
||
|
||
if hs.should_send_federation(): | ||
self._federation = hs.get_federation_sender() | ||
|
||
# We don't bother queuing up presence states if only this instance | ||
# is sending federation. | ||
if hs.config.worker.federation_shard_config.instances == [ | ||
self._instance_name | ||
]: | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._queue_presence_updates = False | ||
|
||
# The queue of recently queued updates as tuples of: `(timestamp, | ||
# stream_id, destinations, user_ids)`. We don't store the full states | ||
# for efficiency, and remote workers will already have the full states | ||
# cached. | ||
self._queue = [] # type: List[Tuple[int, int, Collection[str], Set[str]]] | ||
|
||
self._next_id = 1 | ||
|
||
# Map from instance name to current token | ||
self._current_tokens = {} # type: Dict[str, int] | ||
|
||
if self._queue_presence_updates: | ||
self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS) | ||
|
||
def _clear_queue(self): | ||
"""Clear out older entries from the queue.""" | ||
clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS | ||
|
||
# The queue is sorted by timestamp, so we can bisect to find the right | ||
# place to purge before. Note that we are searching using a 1-tuple with | ||
# the time, which does The Right Thing since the queue is a tuple where | ||
# the first item is a timestamp. | ||
index = bisect(self._queue, (clear_before,)) | ||
self._queue = self._queue[index:] | ||
|
||
def send_presence_to_destinations( | ||
self, states: Collection[UserPresenceState], destinations: Collection[str] | ||
) -> None: | ||
"""Send the presence states to the given destinations. | ||
|
||
Will forward to the local federation sender (if there is one) and queue | ||
to send over replication (if there are other federation sender instances.). | ||
|
||
Must only be called on the master process. | ||
""" | ||
|
||
# This should only be called on a presence writer. | ||
assert self._presence_writer | ||
|
||
if self._federation: | ||
self._federation.send_presence_to_destinations( | ||
states=states, | ||
destinations=destinations, | ||
) | ||
|
||
if not self._queue_presence_updates: | ||
return | ||
|
||
now = self._clock.time_msec() | ||
|
||
stream_id = self._next_id | ||
self._next_id += 1 | ||
|
||
self._queue.append((now, stream_id, destinations, {s.user_id for s in states})) | ||
|
||
self._notifier.notify_replication() | ||
|
||
def get_current_token(self, instance_name: str) -> int: | ||
"""Get the current position of the stream. | ||
|
||
On workers this returns the last stream ID received from replication. | ||
""" | ||
|
||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if instance_name == self._instance_name: | ||
return self._next_id - 1 | ||
else: | ||
return self._current_tokens.get(instance_name, 0) | ||
|
||
async def get_replication_rows( | ||
self, | ||
instance_name: str, | ||
from_token: int, | ||
upto_token: int, | ||
target_row_count: int, | ||
) -> Tuple[List[Tuple[int, Tuple[str, str]]], int, bool]: | ||
"""Get all the updates between the two tokens. | ||
|
||
We return rows in the form of `(destination, user_id)` to keep the size | ||
of each row bounded (rather than returning the sets in a row). | ||
|
||
On workers this will query the master process via HTTP replication. | ||
""" | ||
if instance_name != self._instance_name: | ||
# If not local we query over http replication from the master | ||
result = await self._repl_client( | ||
instance_name=instance_name, | ||
stream_name=PresenceFederationStream.NAME, | ||
from_token=from_token, | ||
upto_token=upto_token, | ||
) | ||
return result["updates"], result["upto_token"], result["limited"] | ||
|
||
# We can find the correct position in the queue by noting that there is | ||
# exactly one entry per stream ID, and that the last entry has an ID of | ||
# `self._next_id - 1`, so we can count backwards from the end. | ||
# | ||
# Since the start of the queue is periodically truncated we need to | ||
# handle the case where `from_token` stream ID has already been dropped. | ||
start_idx = max(from_token - self._next_id, -len(self._queue)) | ||
|
||
to_send = [] # type: List[Tuple[int, Tuple[str, str]]] | ||
limited = False | ||
new_id = upto_token | ||
for _, stream_id, destinations, user_ids in self._queue[start_idx:]: | ||
if stream_id > upto_token: | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
break | ||
|
||
new_id = stream_id | ||
|
||
to_send.extend( | ||
(stream_id, (destination, user_id)) | ||
for destination in destinations | ||
for user_id in user_ids | ||
) | ||
|
||
if len(to_send) > target_row_count: | ||
limited = True | ||
break | ||
|
||
return to_send, new_id, limited | ||
|
||
async def process_replication_rows( | ||
self, stream_name: str, instance_name: str, token: int, rows: list | ||
): | ||
if stream_name != PresenceFederationStream.NAME: | ||
return | ||
|
||
# We keep track of the current tokens | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._current_tokens[instance_name] = token | ||
richvdh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# If we're a federation sender we pull out the presence states to send | ||
# and forward them on. | ||
if not self._federation: | ||
return | ||
|
||
hosts_to_users = {} # type: Dict[str, Set[str]] | ||
for row in rows: | ||
hosts_to_users.setdefault(row.destination, set()).add(row.user_id) | ||
|
||
for host, user_ids in hosts_to_users.items(): | ||
states = await self._presence_handler.current_state_for_users(user_ids) | ||
self._federation.send_presence_to_destinations( | ||
states=states.values(), | ||
destinations=[host], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we remove the federation send queue I plan on making
hs.get_federation_sender()
return an optional federation sender, which will simplify this even more.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by "federation send queue" you mean the
FederationRemoteSendQueue
? Don't we need that for device list updates too?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do mean that yes. The device list updates carefully call the queue, but the queue just no ops. I have a branch that rips the send queue out entirely and everything still works fine.