diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0a62c62d02c3..239553ae138e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -367,13 +367,16 @@ jobs: - name: Set build result env: NEEDS_CONTEXT: ${{ toJSON(needs) }} - # the `jq` incantation dumps out a series of " " lines + # the `jq` incantation dumps out a series of " " lines. + # we set it to an intermediate variable to avoid a pipe, which makes it + # hard to set $rc. run: | - set -o pipefail - jq -r 'to_entries[] | [.key,.value.result] | join(" ")' \ - <<< $NEEDS_CONTEXT | - while read job result; do - if [ "$result" != "success" ]; then - echo "::set-failed ::Job $job returned $result" - fi - done + rc=0 + results=$(jq -r 'to_entries[] | [.key,.value.result] | join(" ")' <<< $NEEDS_CONTEXT) + while read job result ; do + if [ "$result" != "success" ]; then + echo "::set-failed ::Job $job returned $result" + rc=1 + fi + done <<< $results + exit $rc diff --git a/CHANGES.md b/CHANGES.md index b512d9ff3f4a..653324928145 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +Synapse 1.39.0 (2021-07-29) +=========================== + +No significant changes. + + Synapse 1.39.0rc3 (2021-07-28) ============================== @@ -19,10 +25,7 @@ Internal Changes Synapse 1.39.0rc2 (2021-07-22) ============================== -Bugfixes --------- - -- Always include `device_one_time_keys_count` key in `/sync` response to work around a bug in Element Android that broke encryption for new devices. ([\#10457](https://github.com/matrix-org/synapse/issues/10457)) +This release also includes the changes in v1.38.1. Internal Changes diff --git a/changelog.d/10390.misc b/changelog.d/10390.misc new file mode 100644 index 000000000000..911a5733eee8 --- /dev/null +++ b/changelog.d/10390.misc @@ -0,0 +1 @@ +Prune inbound federation inbound queues for a room if they get too large. diff --git a/changelog.d/10415.misc b/changelog.d/10415.misc new file mode 100644 index 000000000000..3b9501acbbcb --- /dev/null +++ b/changelog.d/10415.misc @@ -0,0 +1 @@ +Remove shebang line from module files. diff --git a/changelog.d/10439.bugfix b/changelog.d/10439.bugfix new file mode 100644 index 000000000000..74e5a25126d9 --- /dev/null +++ b/changelog.d/10439.bugfix @@ -0,0 +1 @@ +Fix events with floating outlier state being rejected over federation. diff --git a/changelog.d/10440.feature b/changelog.d/10440.feature new file mode 100644 index 000000000000..f1833b0bd726 --- /dev/null +++ b/changelog.d/10440.feature @@ -0,0 +1 @@ +Allow setting transaction limit for database connections. diff --git a/changelog.d/10447.feature b/changelog.d/10447.feature new file mode 100644 index 000000000000..df8bb51167f7 --- /dev/null +++ b/changelog.d/10447.feature @@ -0,0 +1 @@ +Update support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) to consider changes in the MSC around which servers can issue join events. diff --git a/changelog.d/10491.misc b/changelog.d/10491.misc new file mode 100644 index 000000000000..3867cf2682c1 --- /dev/null +++ b/changelog.d/10491.misc @@ -0,0 +1 @@ +Improve type annotations for `ObservableDeferred`. diff --git a/changelog.d/10499.bugfix b/changelog.d/10499.bugfix new file mode 100644 index 000000000000..6487af6c9691 --- /dev/null +++ b/changelog.d/10499.bugfix @@ -0,0 +1 @@ +Fix a bug which caused an explicit assignment of power-level 0 to a user to be misinterpreted in rare circumstances. diff --git a/changelog.d/10500.misc b/changelog.d/10500.misc new file mode 100644 index 000000000000..dbaff57364d2 --- /dev/null +++ b/changelog.d/10500.misc @@ -0,0 +1 @@ +Fix a bug which caused production debian packages to be incorrectly marked as 'prerelease'. diff --git a/changelog.d/10511.feature b/changelog.d/10511.feature new file mode 100644 index 000000000000..f1833b0bd726 --- /dev/null +++ b/changelog.d/10511.feature @@ -0,0 +1 @@ +Allow setting transaction limit for database connections. diff --git a/changelog.d/10512.misc b/changelog.d/10512.misc new file mode 100644 index 000000000000..c012e89f4bbb --- /dev/null +++ b/changelog.d/10512.misc @@ -0,0 +1 @@ +Update the `tests-done` Github Actions status. diff --git a/changelog.d/10513.feature b/changelog.d/10513.feature new file mode 100644 index 000000000000..153b2df7b205 --- /dev/null +++ b/changelog.d/10513.feature @@ -0,0 +1 @@ +Add a configuration setting for the time a `/sync` response is cached for. diff --git a/debian/changelog b/debian/changelog index ff21599df1dd..341c1ac99268 100644 --- a/debian/changelog +++ b/debian/changelog @@ -4,6 +4,12 @@ matrix-synapse-py3 (1.39.0ubuntu1) UNRELEASED; urgency=medium -- Richard van der Hoff Tue, 20 Jul 2021 00:10:03 +0100 +matrix-synapse-py3 (1.39.0) stable; urgency=medium + + * New synapse release 1.39.0. + + -- Synapse Packaging team Thu, 29 Jul 2021 09:59:00 +0100 + matrix-synapse-py3 (1.39.0~rc3) stable; urgency=medium * New synapse release 1.39.0~rc3. diff --git a/docker/build_debian.sh b/docker/build_debian.sh index f572ed9aa0ef..801ff454716c 100644 --- a/docker/build_debian.sh +++ b/docker/build_debian.sh @@ -11,10 +11,6 @@ DIST=`cut -d ':' -f2 <<< $distro` cp -aT /synapse/source /synapse/build cd /synapse/build -# add an entry to the changelog for this distribution -dch -M -l "+$DIST" "build for $DIST" -dch -M -r "" --force-distribution --distribution "$DIST" - # if this is a prerelease, set the Section accordingly. # # When the package is later added to the package repo, reprepro will use the @@ -23,11 +19,14 @@ dch -M -r "" --force-distribution --distribution "$DIST" DEB_VERSION=`dpkg-parsechangelog -SVersion` case $DEB_VERSION in - *rc*|*a*|*b*|*c*) + *~rc*|*~a*|*~b*|*~c*) sed -ie '/^Section:/c\Section: prerelease' debian/control ;; esac +# add an entry to the changelog for this distribution +dch -M -l "+$DIST" "build for $DIST" +dch -M -r "" --force-distribution --distribution "$DIST" dpkg-buildpackage -us -uc diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 853c2f68990f..a2efc14100ac 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -711,6 +711,15 @@ caches: # #expiry_time: 30m + # Controls how long the results of a /sync request are cached for after + # a successful response is returned. A higher duration can help clients with + # intermittent connections, at the cost of higher memory usage. + # + # By default, this is zero, which means that sync responses are not cached + # at all. + # + #sync_response_cache_duration: 2m + ## Database ## @@ -720,6 +729,9 @@ caches: # 'name' gives the database engine to use: either 'sqlite3' (for SQLite) or # 'psycopg2' (for PostgreSQL). # +# 'txn_limit' gives the maximum number of transactions to run per connection +# before reconnecting. Defaults to 0, which means no limit. +# # 'args' gives options which are passed through to the database engine, # except for options starting 'cp_', which are used to configure the Twisted # connection pool. For a reference to valid arguments, see: @@ -740,6 +752,7 @@ caches: # #database: # name: psycopg2 +# txn_limit: 10000 # args: # user: synapse_user # password: secretpassword diff --git a/synapse/__init__.py b/synapse/__init__.py index c9a445c8fe47..5da6c924fcec 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -47,7 +47,7 @@ except ImportError: pass -__version__ = "1.39.0rc3" +__version__ = "1.39.0" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/_scripts/review_recent_signups.py b/synapse/_scripts/review_recent_signups.py index 01dc0c42377f..9de913db889c 100644 --- a/synapse/_scripts/review_recent_signups.py +++ b/synapse/_scripts/review_recent_signups.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 2878d2c14077..3234d9ebba07 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2019 Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 2d50060ffb0b..de1bcee0a785 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 2d50060ffb0b..de1bcee0a785 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 57af28f10a57..885454ed44c0 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 2d50060ffb0b..de1bcee0a785 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 2d50060ffb0b..de1bcee0a785 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 2d50060ffb0b..de1bcee0a785 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index c3d49925189e..3b7131af8fa2 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2016 OpenMarket Ltd # Copyright 2020 The Matrix.org Foundation C.I.C. # diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 920b34d97bf4..7dae163c1af3 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2014-2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd # diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 2d50060ffb0b..de1bcee0a785 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 2d50060ffb0b..de1bcee0a785 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2d50060ffb0b..de1bcee0a785 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index a368efb354ed..14bde2717943 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/config/cache.py b/synapse/config/cache.py index 8d5f38b5d934..d119427ad864 100644 --- a/synapse/config/cache.py +++ b/synapse/config/cache.py @@ -151,6 +151,15 @@ def generate_config_section(self, **kwargs): # entries are never evicted based on time. # #expiry_time: 30m + + # Controls how long the results of a /sync request are cached for after + # a successful response is returned. A higher duration can help clients with + # intermittent connections, at the cost of higher memory usage. + # + # By default, this is zero, which means that sync responses are not cached + # at all. + # + #sync_response_cache_duration: 2m """ def read_config(self, config, **kwargs): @@ -212,6 +221,10 @@ def read_config(self, config, **kwargs): else: self.expiry_time_msec = None + self.sync_response_cache_duration = self.parse_duration( + cache_config.get("sync_response_cache_duration", 0) + ) + # Resize all caches (if necessary) with the new factors we've loaded self.resize_all_caches() diff --git a/synapse/config/database.py b/synapse/config/database.py index 3d7d92f6152f..651e31b57621 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -33,6 +33,9 @@ # 'name' gives the database engine to use: either 'sqlite3' (for SQLite) or # 'psycopg2' (for PostgreSQL). # +# 'txn_limit' gives the maximum number of transactions to run per connection +# before reconnecting. Defaults to 0, which means no limit. +# # 'args' gives options which are passed through to the database engine, # except for options starting 'cp_', which are used to configure the Twisted # connection pool. For a reference to valid arguments, see: @@ -53,6 +56,7 @@ # #database: # name: psycopg2 +# txn_limit: 10000 # args: # user: synapse_user # password: secretpassword diff --git a/synapse/event_auth.py b/synapse/event_auth.py index 0fa7ffc99fd4..4c92e9a2d40f 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -692,7 +692,7 @@ def get_user_power_level(user_id: str, auth_events: StateMap[EventBase]) -> int: power_level_event = get_power_level_event(auth_events) if power_level_event: level = power_level_event.content.get("users", {}).get(user_id) - if not level: + if level is None: level = power_level_event.content.get("users_default", 0) if level is None: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index dbadf102f2d7..b7a10da15a89 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -22,6 +22,7 @@ Awaitable, Callable, Collection, + Container, Dict, Iterable, List, @@ -513,6 +514,7 @@ async def _try_destination_list( description: str, destinations: Iterable[str], callback: Callable[[str], Awaitable[T]], + failover_errcodes: Optional[Container[str]] = None, failover_on_unknown_endpoint: bool = False, ) -> T: """Try an operation on a series of servers, until it succeeds @@ -533,6 +535,9 @@ async def _try_destination_list( next server tried. Normally the stacktrace is logged but this is suppressed if the exception is an InvalidResponseError. + failover_errcodes: Error codes (specific to this endpoint) which should + cause a failover when received as part of an HTTP 400 error. + failover_on_unknown_endpoint: if True, we will try other servers if it looks like a server doesn't support the endpoint. This is typically useful if the endpoint in question is new or experimental. @@ -544,6 +549,9 @@ async def _try_destination_list( SynapseError if the chosen remote server returns a 300/400 code, or no servers were reachable. """ + if failover_errcodes is None: + failover_errcodes = () + for destination in destinations: if destination == self.server_name: continue @@ -558,11 +566,17 @@ async def _try_destination_list( synapse_error = e.to_synapse_error() failover = False - # Failover on an internal server error, or if the destination - # doesn't implemented the endpoint for some reason. + # Failover should occur: + # + # * On internal server errors. + # * If the destination responds that it cannot complete the request. + # * If the destination doesn't implemented the endpoint for some reason. if 500 <= e.code < 600: failover = True + elif e.code == 400 and synapse_error.errcode in failover_errcodes: + failover = True + elif failover_on_unknown_endpoint and self._is_unknown_endpoint( e, synapse_error ): @@ -678,8 +692,20 @@ async def send_request(destination: str) -> Tuple[str, EventBase, RoomVersion]: return destination, ev, room_version + # MSC3083 defines additional error codes for room joins. Unfortunately + # we do not yet know the room version, assume these will only be returned + # by valid room versions. + failover_errcodes = ( + (Codes.UNABLE_AUTHORISE_JOIN, Codes.UNABLE_TO_GRANT_JOIN) + if membership == Membership.JOIN + else None + ) + return await self._try_destination_list( - "make_" + membership, destinations, send_request + "make_" + membership, + destinations, + send_request, + failover_errcodes=failover_errcodes, ) async def send_join( @@ -818,7 +844,14 @@ async def _execute(pdu: EventBase) -> None: origin=destination, ) + # MSC3083 defines additional error codes for room joins. + failover_errcodes = None if room_version.msc3083_join_rules: + failover_errcodes = ( + Codes.UNABLE_AUTHORISE_JOIN, + Codes.UNABLE_TO_GRANT_JOIN, + ) + # If the join is being authorised via allow rules, we need to send # the /send_join back to the same server that was originally used # with /make_join. @@ -827,7 +860,9 @@ async def _execute(pdu: EventBase) -> None: get_domain_from_id(pdu.content["join_authorised_via_users_server"]) ] - return await self._try_destination_list("send_join", destinations, send_request) + return await self._try_destination_list( + "send_join", destinations, send_request, failover_errcodes=failover_errcodes + ) async def _do_send_join( self, room_version: RoomVersion, destination: str, pdu: EventBase diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d6b5f7fc45fa..451f56c4f4cc 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1030,6 +1030,23 @@ async def _process_incoming_pdus_in_room_inner( origin, event = next + # Prune the event queue if it's getting large. + # + # We do this *after* handling the first event as the common case is + # that the queue is empty (/has the single event in), and so there's + # no need to do this check. + pruned = await self.store.prune_staged_events_in_room(room_id, room_version) + if pruned: + # If we have pruned the queue check we need to refetch the next + # event to handle. + next = await self.store.get_next_staged_event_for_room( + room_id, room_version + ) + if not next: + break + + origin, event = next + lock = await self.store.try_acquire_lock( _INBOUND_EVENT_HANDLING_LOCK_NAME, room_id ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f30bfcc93cf2..590642f510fe 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -269,14 +269,22 @@ def __init__(self, hs: "HomeServer"): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache( - hs.get_clock(), "sync" - ) self.state = hs.get_state_handler() self.auth = hs.get_auth() self.storage = hs.get_storage() self.state_store = self.storage.state + # TODO: flush cache entries on subsequent sync request. + # Once we get the next /sync request (ie, one with the same access token + # that sets 'since' to 'next_batch'), we know that device won't need a + # cached result any more, and we could flush the entry from the cache to save + # memory. + self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache( + hs.get_clock(), + "sync", + timeout_ms=hs.config.caches.sync_response_cache_duration, + ) + # ExpiringCache((User, Device)) -> LruCache(user_id => event_id) self.lazy_loaded_members_cache: ExpiringCache[ Tuple[str, Optional[str]], LruCache[str, str] diff --git a/synapse/notifier.py b/synapse/notifier.py index c5fbebc17d75..bbe337949ac5 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -111,8 +111,9 @@ def __init__( self.last_notified_token = current_token self.last_notified_ms = time_now_ms - with PreserveLoggingContext(): - self.notify_deferred = ObservableDeferred(defer.Deferred()) + self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred( + defer.Deferred() + ) def notify( self, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 85621f33ef1e..a1436f39305d 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 4d4643619f68..c8015a384857 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -15,6 +15,7 @@ # limitations under the License. import logging import time +from collections import defaultdict from sys import intern from time import monotonic as monotonic_time from typing import ( @@ -397,6 +398,7 @@ def __init__( ): self.hs = hs self._clock = hs.get_clock() + self._txn_limit = database_config.config.get("txn_limit", 0) self._database_config = database_config self._db_pool = make_pool(hs.get_reactor(), database_config, engine) @@ -406,6 +408,9 @@ def __init__( self._current_txn_total_time = 0.0 self._previous_loop_ts = 0.0 + # Transaction counter: key is the twisted thread id, value is the current count + self._txn_counters: Dict[int, int] = defaultdict(int) + # TODO(paul): These can eventually be removed once the metrics code # is running in mainline, and we have some nice monitoring frontends # to watch it @@ -750,10 +755,26 @@ def inner_func(conn, *args, **kwargs): sql_scheduling_timer.observe(sched_duration_sec) context.add_database_scheduled(sched_duration_sec) + if self._txn_limit > 0: + tid = self._db_pool.threadID() + self._txn_counters[tid] += 1 + + if self._txn_counters[tid] > self._txn_limit: + logger.debug( + "Reconnecting database connection over transaction limit" + ) + conn.reconnect() + opentracing.log_kv( + {"message": "reconnected due to txn limit"} + ) + self._txn_counters[tid] = 1 + if self.engine.is_connection_closed(conn): logger.debug("Reconnecting closed database connection") conn.reconnect() opentracing.log_kv({"message": "reconnected"}) + if self._txn_limit > 0: + self._txn_counters[tid] = 1 try: if db_autocommit: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 547e43ab9844..44018c1c31ab 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -16,11 +16,11 @@ from queue import Empty, PriorityQueue from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple -from prometheus_client import Gauge +from prometheus_client import Counter, Gauge from synapse.api.constants import MAX_DEPTH from synapse.api.errors import StoreError -from synapse.api.room_versions import RoomVersion +from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -44,6 +44,12 @@ "The total number of events in the inbound federation staging", ) +pdus_pruned_from_federation_queue = Counter( + "synapse_federation_server_number_inbound_pdu_pruned", + "The number of events in the inbound federation staging that have been " + "pruned due to the queue getting too long", +) + logger = logging.getLogger(__name__) @@ -1277,6 +1283,100 @@ def _get_next_staged_event_for_room_txn(txn): return origin, event + async def prune_staged_events_in_room( + self, + room_id: str, + room_version: RoomVersion, + ) -> bool: + """Checks if there are lots of staged events for the room, and if so + prune them down. + + Returns: + Whether any events were pruned + """ + + # First check the size of the queue. + count = await self.db_pool.simple_select_one_onecol( + table="federation_inbound_events_staging", + keyvalues={"room_id": room_id}, + retcol="COALESCE(COUNT(*), 0)", + desc="prune_staged_events_in_room_count", + ) + + if count < 100: + return False + + # If the queue is too large, then we want clear the entire queue, + # keeping only the forward extremities (i.e. the events not referenced + # by other events in the queue). We do this so that we can always + # backpaginate in all the events we have dropped. + rows = await self.db_pool.simple_select_list( + table="federation_inbound_events_staging", + keyvalues={"room_id": room_id}, + retcols=("event_id", "event_json"), + desc="prune_staged_events_in_room_fetch", + ) + + # Find the set of events referenced by those in the queue, as well as + # collecting all the event IDs in the queue. + referenced_events: Set[str] = set() + seen_events: Set[str] = set() + for row in rows: + event_id = row["event_id"] + seen_events.add(event_id) + event_d = db_to_json(row["event_json"]) + + # We don't bother parsing the dicts into full blown event objects, + # as that is needlessly expensive. + + # We haven't checked that the `prev_events` have the right format + # yet, so we check as we go. + prev_events = event_d.get("prev_events", []) + if not isinstance(prev_events, list): + logger.info("Invalid prev_events for %s", event_id) + continue + + if room_version.event_format == EventFormatVersions.V1: + for prev_event_tuple in prev_events: + if not isinstance(prev_event_tuple, list) or len(prev_events) != 2: + logger.info("Invalid prev_events for %s", event_id) + break + + prev_event_id = prev_event_tuple[0] + if not isinstance(prev_event_id, str): + logger.info("Invalid prev_events for %s", event_id) + break + + referenced_events.add(prev_event_id) + else: + for prev_event_id in prev_events: + if not isinstance(prev_event_id, str): + logger.info("Invalid prev_events for %s", event_id) + break + + referenced_events.add(prev_event_id) + + to_delete = referenced_events & seen_events + if not to_delete: + return False + + pdus_pruned_from_federation_queue.inc(len(to_delete)) + logger.info( + "Pruning %d events in room %s from federation queue", + len(to_delete), + room_id, + ) + + await self.db_pool.simple_delete_many( + table="federation_inbound_events_staging", + keyvalues={"room_id": room_id}, + iterable=to_delete, + column="event_id", + desc="prune_staged_events_in_room_delete", + ) + + return True + async def get_all_rooms_with_staged_incoming_events(self) -> List[str]: """Get the room IDs of all events currently staged.""" return await self.db_pool.simple_select_onecol( diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 9253795fdccb..ea9f9b95605d 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -171,7 +171,9 @@ async def add_to_queue( end_item = queue[-1] else: # need to make a new queue item - deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) + deferred: ObservableDeferred[_PersistResult] = ObservableDeferred( + defer.Deferred(), consumeErrors=True + ) end_item = _EventPersistQueueItem( events_and_contexts=[], diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 912cf85f89be..a3b65aee27b5 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -23,6 +23,7 @@ Awaitable, Callable, Dict, + Generic, Hashable, Iterable, List, @@ -39,6 +40,7 @@ from twisted.internet.defer import CancelledError from twisted.internet.interfaces import IReactorTime from twisted.python import failure +from twisted.python.failure import Failure from synapse.logging.context import ( PreserveLoggingContext, @@ -52,7 +54,7 @@ _T = TypeVar("_T") -class ObservableDeferred: +class ObservableDeferred(Generic[_T]): """Wraps a deferred object so that we can add observer deferreds. These observer deferreds do not affect the callback chain of the original deferred. @@ -70,7 +72,7 @@ class ObservableDeferred: __slots__ = ["_deferred", "_observers", "_result"] - def __init__(self, deferred: defer.Deferred, consumeErrors: bool = False): + def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False): object.__setattr__(self, "_deferred", deferred) object.__setattr__(self, "_result", None) object.__setattr__(self, "_observers", set()) @@ -115,7 +117,7 @@ def errback(f): deferred.addCallbacks(callback, errback) - def observe(self) -> defer.Deferred: + def observe(self) -> "defer.Deferred[_T]": """Observe the underlying deferred. This returns a brand new deferred that is resolved when the underlying @@ -123,7 +125,7 @@ def observe(self) -> defer.Deferred: effect the underlying deferred. """ if not self._result: - d: "defer.Deferred[Any]" = defer.Deferred() + d: "defer.Deferred[_T]" = defer.Deferred() def remove(r): self._observers.discard(d) @@ -137,7 +139,7 @@ def remove(r): success, res = self._result return defer.succeed(res) if success else defer.fail(res) - def observers(self) -> List[defer.Deferred]: + def observers(self) -> "List[defer.Deferred[_T]]": return self._observers def has_called(self) -> bool: @@ -146,7 +148,7 @@ def has_called(self) -> bool: def has_succeeded(self) -> bool: return self._result is not None and self._result[0] is True - def get_result(self) -> Any: + def get_result(self) -> Union[_T, Failure]: return self._result[1] def __getattr__(self, name: str) -> Any: diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py index dfa30a62296a..cb08af7385eb 100644 --- a/synapse/util/versionstring.py +++ b/synapse/util/versionstring.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index ba8cf44f4626..4140fcefc2c4 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List from unittest import TestCase from synapse.api.constants import EventTypes @@ -22,6 +23,7 @@ from synapse.logging.context import LoggingContext, run_in_background from synapse.rest import admin from synapse.rest.client.v1 import login, room +from synapse.util.stringutils import random_string from tests import unittest @@ -39,6 +41,8 @@ def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver(federation_http_client=None) self.handler = hs.get_federation_handler() self.store = hs.get_datastore() + self.state_store = hs.get_storage().state + self._event_auth_handler = hs.get_event_auth_handler() return hs def test_exchange_revoked_invite(self): @@ -190,6 +194,133 @@ def test_rejected_state_event_state(self): self.assertEqual(sg, sg2) + def test_backfill_floating_outlier_membership_auth(self): + """ + As the local homeserver, check that we can properly process a federated + event from the OTHER_SERVER with auth_events that include a floating + membership event from the OTHER_SERVER. + + Regression test, see #10439. + """ + OTHER_SERVER = "otherserver" + OTHER_USER = "@otheruser:" + OTHER_SERVER + + # create the room + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + room_id = self.helper.create_room_as( + room_creator=user_id, + is_public=True, + tok=tok, + extra_content={ + "preset": "public_chat", + }, + ) + room_version = self.get_success(self.store.get_room_version(room_id)) + + prev_event_ids = self.get_success(self.store.get_prev_events_for_room(room_id)) + ( + most_recent_prev_event_id, + most_recent_prev_event_depth, + ) = self.get_success(self.store.get_max_depth_of(prev_event_ids)) + # mapping from (type, state_key) -> state_event_id + prev_state_map = self.get_success( + self.state_store.get_state_ids_for_event(most_recent_prev_event_id) + ) + # List of state event ID's + prev_state_ids = list(prev_state_map.values()) + auth_event_ids = prev_state_ids + auth_events = list( + self.get_success(self.store.get_events(auth_event_ids)).values() + ) + + # build a floating outlier member state event + fake_prev_event_id = "$" + random_string(43) + member_event_dict = { + "type": EventTypes.Member, + "content": { + "membership": "join", + }, + "state_key": OTHER_USER, + "room_id": room_id, + "sender": OTHER_USER, + "depth": most_recent_prev_event_depth, + "prev_events": [fake_prev_event_id], + "origin_server_ts": self.clock.time_msec(), + "signatures": {OTHER_SERVER: {"ed25519:key_version": "SomeSignatureHere"}}, + } + builder = self.hs.get_event_builder_factory().for_room_version( + room_version, member_event_dict + ) + member_event = self.get_success( + builder.build( + prev_event_ids=member_event_dict["prev_events"], + auth_event_ids=self._event_auth_handler.compute_auth_events( + builder, + prev_state_map, + for_verification=False, + ), + depth=member_event_dict["depth"], + ) + ) + # Override the signature added from "test" homeserver that we created the event with + member_event.signatures = member_event_dict["signatures"] + + # Add the new member_event to the StateMap + prev_state_map[ + (member_event.type, member_event.state_key) + ] = member_event.event_id + auth_events.append(member_event) + + # build and send an event authed based on the member event + message_event_dict = { + "type": EventTypes.Message, + "content": {}, + "room_id": room_id, + "sender": OTHER_USER, + "depth": most_recent_prev_event_depth, + "prev_events": prev_event_ids.copy(), + "origin_server_ts": self.clock.time_msec(), + "signatures": {OTHER_SERVER: {"ed25519:key_version": "SomeSignatureHere"}}, + } + builder = self.hs.get_event_builder_factory().for_room_version( + room_version, message_event_dict + ) + message_event = self.get_success( + builder.build( + prev_event_ids=message_event_dict["prev_events"], + auth_event_ids=self._event_auth_handler.compute_auth_events( + builder, + prev_state_map, + for_verification=False, + ), + depth=message_event_dict["depth"], + ) + ) + # Override the signature added from "test" homeserver that we created the event with + message_event.signatures = message_event_dict["signatures"] + + # Stub the /event_auth response from the OTHER_SERVER + async def get_event_auth( + destination: str, room_id: str, event_id: str + ) -> List[EventBase]: + return auth_events + + self.handler.federation_client.get_event_auth = get_event_auth + + with LoggingContext("receive_pdu"): + # Fake the OTHER_SERVER federating the message event over to our local homeserver + d = run_in_background( + self.handler.on_receive_pdu, OTHER_SERVER, message_event + ) + self.get_success(d) + + # Now try and get the events on our local homeserver + stored_event = self.get_success( + self.store.get_event(message_event.event_id, allow_none=True) + ) + self.assertTrue(stored_event is not None) + @unittest.override_config( {"rc_invites": {"per_user": {"per_second": 0.5, "burst_count": 3}}} ) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index a0e22594785f..c3fcf7e7b405 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -15,7 +15,9 @@ import attr from parameterized import parameterized +from synapse.api.room_versions import RoomVersions from synapse.events import _EventInternalMetadata +from synapse.util import json_encoder import tests.unittest import tests.utils @@ -504,6 +506,61 @@ def insert_event(txn): ) self.assertSetEqual(difference, set()) + def test_prune_inbound_federation_queue(self): + "Test that pruning of inbound federation queues work" + + room_id = "some_room_id" + + # Insert a bunch of events that all reference the previous one. + self.get_success( + self.store.db_pool.simple_insert_many( + table="federation_inbound_events_staging", + values=[ + { + "origin": "some_origin", + "room_id": room_id, + "received_ts": 0, + "event_id": f"$fake_event_id_{i + 1}", + "event_json": json_encoder.encode( + {"prev_events": [f"$fake_event_id_{i}"]} + ), + "internal_metadata": "{}", + } + for i in range(500) + ], + desc="test_prune_inbound_federation_queue", + ) + ) + + # Calling prune once should return True, i.e. a prune happen. The second + # time it shouldn't. + pruned = self.get_success( + self.store.prune_staged_events_in_room(room_id, RoomVersions.V6) + ) + self.assertTrue(pruned) + + pruned = self.get_success( + self.store.prune_staged_events_in_room(room_id, RoomVersions.V6) + ) + self.assertFalse(pruned) + + # Assert that we only have a single event left in the queue, and that it + # is the last one. + count = self.get_success( + self.store.db_pool.simple_select_one_onecol( + table="federation_inbound_events_staging", + keyvalues={"room_id": room_id}, + retcol="COALESCE(COUNT(*), 0)", + desc="test_prune_inbound_federation_queue", + ) + ) + self.assertEqual(count, 1) + + _, event_id = self.get_success( + self.store.get_next_staged_event_id_for_room(room_id) + ) + self.assertEqual(event_id, "$fake_event_id_500") + @attr.s class FakeEvent: diff --git a/tests/storage/test_txn_limit.py b/tests/storage/test_txn_limit.py new file mode 100644 index 000000000000..6ff3ebb13780 --- /dev/null +++ b/tests/storage/test_txn_limit.py @@ -0,0 +1,36 @@ +# Copyright 2014-2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from tests import unittest + + +class SQLTransactionLimitTestCase(unittest.HomeserverTestCase): + """Test SQL transaction limit doesn't break transactions.""" + + def make_homeserver(self, reactor, clock): + return self.setup_test_homeserver(db_txn_limit=1000) + + def test_config(self): + db_config = self.hs.config.get_single_database() + self.assertEqual(db_config.config["txn_limit"], 1000) + + def test_select(self): + def do_select(txn): + txn.execute("SELECT 1") + + db_pool = self.hs.get_datastores().databases[0] + + # force txn limit to roll over at least once + for _ in range(0, 1001): + self.get_success_or_raise(db_pool.runInteraction("test_select", do_select)) diff --git a/tests/utils.py b/tests/utils.py index 6bd008dcfe21..f3458ca88dfc 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -239,6 +239,9 @@ def setup_test_homeserver( "args": {"database": ":memory:", "cp_min": 1, "cp_max": 1}, } + if "db_txn_limit" in kwargs: + database_config["txn_limit"] = kwargs["db_txn_limit"] + database = DatabaseConnectionConfig("master", database_config) config.database.databases = [database]