Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Catch-up after Federation Outage (bonus): Catch-up on Synapse Startup (
Browse files Browse the repository at this point in the history
…#8322)

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>

* Fix _set_destination_retry_timings

This came about because the code assumed that retry_interval
could not be NULL — which has been challenged by catch-up.
  • Loading branch information
reivilibre authored Sep 18, 2020
1 parent 8a4a418 commit 36efbca
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog.d/8230.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
1 change: 0 additions & 1 deletion changelog.d/8230.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8247.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
1 change: 0 additions & 1 deletion changelog.d/8247.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8258.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
1 change: 0 additions & 1 deletion changelog.d/8258.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8322.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
51 changes: 51 additions & 0 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
"Total number of PDUs queued for sending across all destinations",
)

# Time (in s) after Synapse's startup that we will begin to wake up destinations
# that have catch-up outstanding.
CATCH_UP_STARTUP_DELAY_SEC = 15

# Time (in s) to wait in between waking up each destination, i.e. one destination
# will be woken up every <x> seconds after Synapse's startup until we have woken
# every destination has outstanding catch-up.
CATCH_UP_STARTUP_INTERVAL_SEC = 5


class FederationSender:
def __init__(self, hs: "synapse.server.HomeServer"):
Expand Down Expand Up @@ -125,6 +134,14 @@ def __init__(self, hs: "synapse.server.HomeServer"):
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
)

# wake up destinations that have outstanding PDUs to be caught up
self._catchup_after_startup_timer = self.clock.call_later(
CATCH_UP_STARTUP_DELAY_SEC,
run_as_background_process,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)

def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
Expand Down Expand Up @@ -560,3 +577,37 @@ async def get_replication_rows(
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return [], 0, False

async def _wake_destinations_needing_catchup(self):
"""
Wakes up destinations that need catch-up and are not currently being
backed off from.
In order to reduce load spikes, adds a delay between each destination.
"""

last_processed = None # type: Optional[str]

while True:
destinations_to_wake = await self.store.get_catch_up_outstanding_destinations(
last_processed
)

if not destinations_to_wake:
# finished waking all destinations!
self._catchup_after_startup_timer = None
break

destinations_to_wake = [
d
for d in destinations_to_wake
if self._federation_shard_config.should_handle(self._instance_name, d)
]

for last_processed in destinations_to_wake:
logger.info(
"Destination %s has outstanding catch-up, waking up.",
last_processed,
)
self.wake_destination(last_processed)
await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
66 changes: 64 additions & 2 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def _set_destination_retry_timings(
retry_interval = EXCLUDED.retry_interval
WHERE
EXCLUDED.retry_interval = 0
OR destinations.retry_interval IS NULL
OR destinations.retry_interval < EXCLUDED.retry_interval
"""

Expand Down Expand Up @@ -249,7 +250,11 @@ def _set_destination_retry_timings(
"retry_interval": retry_interval,
},
)
elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
elif (
retry_interval == 0
or prev_row["retry_interval"] is None
or prev_row["retry_interval"] < retry_interval
):
self.db_pool.simple_update_one_txn(
txn,
"destinations",
Expand Down Expand Up @@ -397,7 +402,7 @@ async def get_catch_up_room_event_ids(

@staticmethod
def _get_catch_up_room_event_ids_txn(
txn, destination: str, last_successful_stream_ordering: int,
txn: LoggingTransaction, destination: str, last_successful_stream_ordering: int,
) -> List[str]:
q = """
SELECT event_id FROM destination_rooms
Expand All @@ -412,3 +417,60 @@ def _get_catch_up_room_event_ids_txn(
)
event_ids = [row[0] for row in txn]
return event_ids

async def get_catch_up_outstanding_destinations(
self, after_destination: Optional[str]
) -> List[str]:
"""
Gets at most 25 destinations which have outstanding PDUs to be caught up,
and are not being backed off from
Args:
after_destination:
If provided, all destinations must be lexicographically greater
than this one.
Returns:
list of up to 25 destinations with outstanding catch-up.
These are the lexicographically first destinations which are
lexicographically greater than after_destination (if provided).
"""
time = self.hs.get_clock().time_msec()

return await self.db_pool.runInteraction(
"get_catch_up_outstanding_destinations",
self._get_catch_up_outstanding_destinations_txn,
time,
after_destination,
)

@staticmethod
def _get_catch_up_outstanding_destinations_txn(
txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
) -> List[str]:
q = """
SELECT destination FROM destinations
WHERE destination IN (
SELECT destination FROM destination_rooms
WHERE destination_rooms.stream_ordering >
destinations.last_successful_stream_ordering
)
AND destination > ?
AND (
retry_last_ts IS NULL OR
retry_last_ts + retry_interval < ?
)
ORDER BY destination
LIMIT 25
"""
txn.execute(
q,
(
# everything is lexicographically greater than "" so this gives
# us the first batch of up to 25.
after_destination or "",
now_time_ms,
),
)

destinations = [row[0] for row in txn]
return destinations
99 changes: 99 additions & 0 deletions tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,102 @@ def test_catch_up_loop(self):
per_dest_queue._last_successful_stream_ordering,
event_5.internal_metadata.stream_ordering,
)

@override_config({"send_federation": True})
def test_catch_up_on_synapse_startup(self):
"""
Tests the behaviour of get_catch_up_outstanding_destinations and
_wake_destinations_needing_catchup.
"""

# list of sorted server names (note that there are more servers than the batch
# size used in get_catch_up_outstanding_destinations).
server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"]

# ARRANGE:
# - a local user (u1)
# - a room which u1 is joined to (and remote users @user:serverXX are
# joined to)

# mark the remotes as online
self.is_online = True

self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room_id = self.helper.create_room_as("u1", tok=u1_token)

for server_name in server_names:
self.get_success(
event_injection.inject_member_event(
self.hs, room_id, "@user:%s" % server_name, "join"
)
)

# create an event
self.helper.send(room_id, "deary me!", tok=u1_token)

# ASSERT:
# - All servers are up to date so none should have outstanding catch-up
outstanding_when_successful = self.get_success(
self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
)
self.assertEqual(outstanding_when_successful, [])

# ACT:
# - Make the remote servers unreachable
self.is_online = False

# - Mark zzzerver as being backed-off from
now = self.clock.time_msec()
self.get_success(
self.hs.get_datastore().set_destination_retry_timings(
"zzzerver", now, now, 24 * 60 * 60 * 1000 # retry in 1 day
)
)

# - Send an event
self.helper.send(room_id, "can anyone hear me?", tok=u1_token)

# ASSERT (get_catch_up_outstanding_destinations):
# - all remotes are outstanding
# - they are returned in batches of 25, in order
outstanding_1 = self.get_success(
self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
)

self.assertEqual(len(outstanding_1), 25)
self.assertEqual(outstanding_1, server_names[0:25])

outstanding_2 = self.get_success(
self.hs.get_datastore().get_catch_up_outstanding_destinations(
outstanding_1[-1]
)
)
self.assertNotIn("zzzerver", outstanding_2)
self.assertEqual(len(outstanding_2), 17)
self.assertEqual(outstanding_2, server_names[25:-1])

# ACT: call _wake_destinations_needing_catchup

# patch wake_destination to just count the destinations instead
woken = []

def wake_destination_track(destination):
woken.append(destination)

self.hs.get_federation_sender().wake_destination = wake_destination_track

# cancel the pre-existing timer for _wake_destinations_needing_catchup
# this is because we are calling it manually rather than waiting for it
# to be called automatically
self.hs.get_federation_sender()._catchup_after_startup_timer.cancel()

self.get_success(
self.hs.get_federation_sender()._wake_destinations_needing_catchup(), by=5.0
)

# ASSERT (_wake_destinations_needing_catchup):
# - all remotes are woken up, save for zzzerver
self.assertNotIn("zzzerver", woken)
# - all destinations are woken exactly once; they appear once in woken.
self.assertCountEqual(woken, server_names[:-1])

0 comments on commit 36efbca

Please sign in to comment.