Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add tests for last_successful_stream_ordering (#8258)
Browse files Browse the repository at this point in the history
  • Loading branch information
reivilibre authored Sep 7, 2020
1 parent 77b4711 commit 765437d
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/8258.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage.
76 changes: 76 additions & 0 deletions tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ def prepare(self, reactor, clock, hs):
return_value=make_awaitable(["test", "host2"])
)

# whenever send_transaction is called, record the pdu data
self.pdus = []
self.failed_pdus = []
self.is_online = True
self.hs.get_federation_transport_client().send_transaction.side_effect = (
self.record_transaction
)

async def record_transaction(self, txn, json_cb):
if self.is_online:
data = json_cb()
self.pdus.extend(data["pdus"])
return {}
else:
data = json_cb()
self.failed_pdus.extend(data["pdus"])
raise IOError("Failed to connect because this is a test!")

def get_destination_room(self, room: str, destination: str = "host2") -> dict:
"""
Gets the destination_rooms entry for a (destination, room_id) pair.
Expand Down Expand Up @@ -80,3 +98,61 @@ def test_catch_up_destination_rooms_tracking(self):
self.assertEqual(row_1["event_id"], event_id_1)
self.assertEqual(row_2["event_id"], event_id_2)
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)

@override_config({"send_federation": True})
def test_catch_up_last_successful_stream_ordering_tracking(self):
"""
Tests that we populate the `destination_rooms` table as needed.
"""
self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room = self.helper.create_room_as("u1", tok=u1_token)

# take the remote offline
self.is_online = False

self.get_success(
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
)

self.helper.send(room, "wombats!", tok=u1_token)
self.pump()

lsso_1 = self.get_success(
self.hs.get_datastore().get_destination_last_successful_stream_ordering(
"host2"
)
)

self.assertIsNone(
lsso_1,
"There should be no last successful stream ordering for an always-offline destination",
)

# bring the remote online
self.is_online = True

event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]

lsso_2 = self.get_success(
self.hs.get_datastore().get_destination_last_successful_stream_ordering(
"host2"
)
)
row_2 = self.get_destination_room(room)

self.assertEqual(
self.pdus[0]["content"]["body"],
"rabbits!",
"Test fault: didn't receive the right PDU",
)
self.assertEqual(
row_2["event_id"],
event_id_2,
"Test fault: destination_rooms not updated correctly",
)
self.assertEqual(
lsso_2,
row_2["stream_ordering"],
"Send succeeded but not marked as last_successful_stream_ordering",
)

0 comments on commit 765437d

Please sign in to comment.