From 765437df54a0e74f37a2c65515f54755096eeecd Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Mon, 7 Sep 2020 10:11:38 +0100 Subject: [PATCH 1/3] Add tests for `last_successful_stream_ordering` (#8258) --- changelog.d/8258.misc | 1 + tests/federation/test_federation_catch_up.py | 76 ++++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 changelog.d/8258.misc diff --git a/changelog.d/8258.misc b/changelog.d/8258.misc new file mode 100644 index 000000000000..3c27803be45f --- /dev/null +++ b/changelog.d/8258.misc @@ -0,0 +1 @@ +Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage. diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 73c51c9d6caa..6cdcc378f098 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -28,6 +28,24 @@ def prepare(self, reactor, clock, hs): return_value=make_awaitable(["test", "host2"]) ) + # whenever send_transaction is called, record the pdu data + self.pdus = [] + self.failed_pdus = [] + self.is_online = True + self.hs.get_federation_transport_client().send_transaction.side_effect = ( + self.record_transaction + ) + + async def record_transaction(self, txn, json_cb): + if self.is_online: + data = json_cb() + self.pdus.extend(data["pdus"]) + return {} + else: + data = json_cb() + self.failed_pdus.extend(data["pdus"]) + raise IOError("Failed to connect because this is a test!") + def get_destination_room(self, room: str, destination: str = "host2") -> dict: """ Gets the destination_rooms entry for a (destination, room_id) pair. @@ -80,3 +98,61 @@ def test_catch_up_destination_rooms_tracking(self): self.assertEqual(row_1["event_id"], event_id_1) self.assertEqual(row_2["event_id"], event_id_2) self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1) + + @override_config({"send_federation": True}) + def test_catch_up_last_successful_stream_ordering_tracking(self): + """ + Tests that we populate the `destination_rooms` table as needed. + """ + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room = self.helper.create_room_as("u1", tok=u1_token) + + # take the remote offline + self.is_online = False + + self.get_success( + event_injection.inject_member_event(self.hs, room, "@user:host2", "join") + ) + + self.helper.send(room, "wombats!", tok=u1_token) + self.pump() + + lsso_1 = self.get_success( + self.hs.get_datastore().get_destination_last_successful_stream_ordering( + "host2" + ) + ) + + self.assertIsNone( + lsso_1, + "There should be no last successful stream ordering for an always-offline destination", + ) + + # bring the remote online + self.is_online = True + + event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"] + + lsso_2 = self.get_success( + self.hs.get_datastore().get_destination_last_successful_stream_ordering( + "host2" + ) + ) + row_2 = self.get_destination_room(room) + + self.assertEqual( + self.pdus[0]["content"]["body"], + "rabbits!", + "Test fault: didn't receive the right PDU", + ) + self.assertEqual( + row_2["event_id"], + event_id_2, + "Test fault: destination_rooms not updated correctly", + ) + self.assertEqual( + lsso_2, + row_2["stream_ordering"], + "Send succeeded but not marked as last_successful_stream_ordering", + ) From 5b452df23b02fa98d0277e79fa6154af981c4f3a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 7 Sep 2020 11:41:50 +0100 Subject: [PATCH 2/3] Run database updates in a transaction (#8265) Fixes: #6467 --- changelog.d/8265.bugfix | 1 + synapse/storage/prepare_database.py | 27 ++++++++++++++++++++++----- 2 files changed, 23 insertions(+), 5 deletions(-) create mode 100644 changelog.d/8265.bugfix diff --git a/changelog.d/8265.bugfix b/changelog.d/8265.bugfix new file mode 100644 index 000000000000..981a836d218c --- /dev/null +++ b/changelog.d/8265.bugfix @@ -0,0 +1 @@ +Fix logstanding bug which could lead to incomplete database upgrades on SQLite. diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 964d8d9eb876..229acb2da7a9 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -19,12 +19,15 @@ import os import re from collections import Counter -from typing import TextIO +from typing import Optional, TextIO import attr +from synapse.config.homeserver import HomeServerConfig +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.engines.postgres import PostgresEngine -from synapse.storage.types import Cursor +from synapse.storage.types import Connection, Cursor +from synapse.types import Collection logger = logging.getLogger(__name__) @@ -47,7 +50,12 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass -def prepare_database(db_conn, database_engine, config, databases=["main", "state"]): +def prepare_database( + db_conn: Connection, + database_engine: BaseDatabaseEngine, + config: Optional[HomeServerConfig], + databases: Collection[str] = ["main", "state"], +): """Prepares a physical database for usage. Will either create all necessary tables or upgrade from an older schema version. @@ -57,15 +65,24 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state Args: db_conn: database_engine: - config (synapse.config.homeserver.HomeServerConfig|None): + config : application config, or None if we are connecting to an existing database which we expect to be configured already - databases (list[str]): The name of the databases that will be used + databases: The name of the databases that will be used with this physical database. Defaults to all databases. """ try: cur = db_conn.cursor() + + # sqlite does not automatically start transactions for DDL / SELECT statements, + # so we start one before running anything. This ensures that any upgrades + # are either applied completely, or not at all. + # + # (psycopg2 automatically starts a transaction as soon as we run any statements + # at all, so this is redundant but harmless there.) + cur.execute("BEGIN TRANSACTION") + version_info = _get_or_create_schema_state(cur, database_engine) if version_info: From 7586fdf1e839a764e9ce19b8c457dbd9367a5c25 Mon Sep 17 00:00:00 2001 From: Alexandre Morignot Date: Mon, 7 Sep 2020 13:21:38 +0200 Subject: [PATCH 3/3] Bump canonicaljson to version 1.4.0 (#8262) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The version 1.3.0 has a bug with unicode charecters: ``` >>> from canonicaljson import encode_pretty_printed_json >>> encode_pretty_printed_json({'a': 'à'}) Traceback (most recent call last): File "", line 1, in File "/home/erdnaxeli/.pyenv/versions/3.6.7/lib/python3.6/site-packages/canonicaljson.py", line 96, in encode_pretty_printed_json return _pretty_encoder.encode(json_object).encode("ascii") UnicodeEncodeError: 'ascii' codec can't encode character '\xe0' in position 12: ordinal not in range(128) ``` Signed-off-by: Alexandre Morignot Co-authored-by: Alexandre Morignot --- changelog.d/8262.bugfix | 1 + synapse/python_dependencies.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/8262.bugfix diff --git a/changelog.d/8262.bugfix b/changelog.d/8262.bugfix new file mode 100644 index 000000000000..2b84927de3ee --- /dev/null +++ b/changelog.d/8262.bugfix @@ -0,0 +1 @@ +Upgrade canonicaljson to version 1.4.0 to fix an unicode encoding issue. diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 2d995ec456a5..ff0c67228bea 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -43,7 +43,7 @@ "jsonschema>=2.5.1", "frozendict>=1", "unpaddedbase64>=1.1.0", - "canonicaljson>=1.3.0", + "canonicaljson>=1.4.0", # we use the type definitions added in signedjson 1.1. "signedjson>=1.1.0", "pynacl>=1.2.1",