diff --git a/changelog.d/8258.misc b/changelog.d/8258.misc new file mode 100644 index 000000000000..3c27803be45f --- /dev/null +++ b/changelog.d/8258.misc @@ -0,0 +1 @@ +Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage. diff --git a/changelog.d/8262.bugfix b/changelog.d/8262.bugfix new file mode 100644 index 000000000000..2b84927de3ee --- /dev/null +++ b/changelog.d/8262.bugfix @@ -0,0 +1 @@ +Upgrade canonicaljson to version 1.4.0 to fix an unicode encoding issue. diff --git a/changelog.d/8265.bugfix b/changelog.d/8265.bugfix new file mode 100644 index 000000000000..981a836d218c --- /dev/null +++ b/changelog.d/8265.bugfix @@ -0,0 +1 @@ +Fix logstanding bug which could lead to incomplete database upgrades on SQLite. diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 2d995ec456a5..ff0c67228bea 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -43,7 +43,7 @@ "jsonschema>=2.5.1", "frozendict>=1", "unpaddedbase64>=1.1.0", - "canonicaljson>=1.3.0", + "canonicaljson>=1.4.0", # we use the type definitions added in signedjson 1.1. "signedjson>=1.1.0", "pynacl>=1.2.1", diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 964d8d9eb876..229acb2da7a9 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -19,12 +19,15 @@ import os import re from collections import Counter -from typing import TextIO +from typing import Optional, TextIO import attr +from synapse.config.homeserver import HomeServerConfig +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.engines.postgres import PostgresEngine -from synapse.storage.types import Cursor +from synapse.storage.types import Connection, Cursor +from synapse.types import Collection logger = logging.getLogger(__name__) @@ -47,7 +50,12 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass -def prepare_database(db_conn, database_engine, config, databases=["main", "state"]): +def prepare_database( + db_conn: Connection, + database_engine: BaseDatabaseEngine, + config: Optional[HomeServerConfig], + databases: Collection[str] = ["main", "state"], +): """Prepares a physical database for usage. Will either create all necessary tables or upgrade from an older schema version. @@ -57,15 +65,24 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state Args: db_conn: database_engine: - config (synapse.config.homeserver.HomeServerConfig|None): + config : application config, or None if we are connecting to an existing database which we expect to be configured already - databases (list[str]): The name of the databases that will be used + databases: The name of the databases that will be used with this physical database. Defaults to all databases. """ try: cur = db_conn.cursor() + + # sqlite does not automatically start transactions for DDL / SELECT statements, + # so we start one before running anything. This ensures that any upgrades + # are either applied completely, or not at all. + # + # (psycopg2 automatically starts a transaction as soon as we run any statements + # at all, so this is redundant but harmless there.) + cur.execute("BEGIN TRANSACTION") + version_info = _get_or_create_schema_state(cur, database_engine) if version_info: diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 73c51c9d6caa..6cdcc378f098 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -28,6 +28,24 @@ def prepare(self, reactor, clock, hs): return_value=make_awaitable(["test", "host2"]) ) + # whenever send_transaction is called, record the pdu data + self.pdus = [] + self.failed_pdus = [] + self.is_online = True + self.hs.get_federation_transport_client().send_transaction.side_effect = ( + self.record_transaction + ) + + async def record_transaction(self, txn, json_cb): + if self.is_online: + data = json_cb() + self.pdus.extend(data["pdus"]) + return {} + else: + data = json_cb() + self.failed_pdus.extend(data["pdus"]) + raise IOError("Failed to connect because this is a test!") + def get_destination_room(self, room: str, destination: str = "host2") -> dict: """ Gets the destination_rooms entry for a (destination, room_id) pair. @@ -80,3 +98,61 @@ def test_catch_up_destination_rooms_tracking(self): self.assertEqual(row_1["event_id"], event_id_1) self.assertEqual(row_2["event_id"], event_id_2) self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1) + + @override_config({"send_federation": True}) + def test_catch_up_last_successful_stream_ordering_tracking(self): + """ + Tests that we populate the `destination_rooms` table as needed. + """ + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room = self.helper.create_room_as("u1", tok=u1_token) + + # take the remote offline + self.is_online = False + + self.get_success( + event_injection.inject_member_event(self.hs, room, "@user:host2", "join") + ) + + self.helper.send(room, "wombats!", tok=u1_token) + self.pump() + + lsso_1 = self.get_success( + self.hs.get_datastore().get_destination_last_successful_stream_ordering( + "host2" + ) + ) + + self.assertIsNone( + lsso_1, + "There should be no last successful stream ordering for an always-offline destination", + ) + + # bring the remote online + self.is_online = True + + event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"] + + lsso_2 = self.get_success( + self.hs.get_datastore().get_destination_last_successful_stream_ordering( + "host2" + ) + ) + row_2 = self.get_destination_room(room) + + self.assertEqual( + self.pdus[0]["content"]["body"], + "rabbits!", + "Test fault: didn't receive the right PDU", + ) + self.assertEqual( + row_2["event_id"], + event_id_2, + "Test fault: destination_rooms not updated correctly", + ) + self.assertEqual( + lsso_2, + row_2["stream_ordering"], + "Send succeeded but not marked as last_successful_stream_ordering", + )