Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix bug in account data replication stream. (#7656)
Browse files Browse the repository at this point in the history
* Ensure account data stream IDs are unique.

The account data stream is shared between three tables, and the maximum
allocated ID was tracked in a dedicated table. Updating the max ID
happened outside the transaction that allocated the ID, leading to a
race where if the server was restarted then the same ID could be
allocated but the max ID failed to be updated, leading it to be reused.

The ID generators have support for tracking across multiple tables, so
we may as well use that instead of a dedicated table.

* Fix bug in account data replication stream.

If the same stream ID was used in both global and room account data then
the getting updates for the replication stream would fail due to
`heapq.merge(..)` trying to compare a `str` with a `None`. (This is
because you'd have two rows like `(534, '!room')` and `(534, None)` from
the room and global account data tables).

Fix is just to order by stream ID, since we don't rely on the ordering
beyond that. The bug where stream IDs can be reused should be fixed now,
so this case shouldn't happen going forward.

Fixes #7617
  • Loading branch information
erikjohnston authored Jun 9, 2020
1 parent 3c45a78 commit 664409b
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 4 deletions.
1 change: 1 addition & 0 deletions changelog.d/7656.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in account data replication stream.
8 changes: 7 additions & 1 deletion synapse/replication/slave/storage/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
def __init__(self, database: Database, db_conn, hs):
self._account_data_id_gen = SlavedIdTracker(
db_conn, "account_data_max_stream_id", "stream_id"
db_conn,
"account_data",
"stream_id",
extra_tables=[
("room_account_data", "stream_id"),
("room_tags_revisions", "stream_id"),
],
)

super(SlavedAccountDataStore, self).__init__(database, db_conn, hs)
Expand Down
10 changes: 8 additions & 2 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,14 @@ async def _update_function(
for stream_id, user_id, room_id, account_data_type in room_results
)

# we need to return a sorted list, so merge them together.
updates = list(heapq.merge(room_rows, global_rows))
# We need to return a sorted list, so merge them together.
#
# Note: We order only by the stream ID to work around a bug where the
# same stream ID could appear in both `global_rows` and `room_rows`,
# leading to a comparison between the data tuples. The comparison could
# fail due to attempting to compare the `room_id` which results in a
# `TypeError` from comparing a `str` vs `None`.
updates = list(heapq.merge(room_rows, global_rows, key=lambda row: row[0]))
return updates, to_token, limited


Expand Down
16 changes: 15 additions & 1 deletion synapse/storage/data_stores/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,13 @@ def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
class AccountDataStore(AccountDataWorkerStore):
def __init__(self, database: Database, db_conn, hs):
self._account_data_id_gen = StreamIdGenerator(
db_conn, "account_data_max_stream_id", "stream_id"
db_conn,
"account_data_max_stream_id",
"stream_id",
extra_tables=[
("room_account_data", "stream_id"),
("room_tags_revisions", "stream_id"),
],
)

super(AccountDataStore, self).__init__(database, db_conn, hs)
Expand Down Expand Up @@ -387,6 +393,10 @@ def add_account_data_for_user(self, user_id, account_data_type, content):
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
#
# Note: This is only here for backwards compat to allow admins to
# roll back to a previous Synapse version. Next time we update the
# database version we can remove this table.
yield self._update_max_stream_id(next_id)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
Expand All @@ -405,6 +415,10 @@ def _update_max_stream_id(self, next_id):
next_id(int): The the revision to advance to.
"""

# Note: This is only here for backwards compat to allow admins to
# roll back to a previous Synapse version. Next time we update the
# database version we can remove this table.

def _update(txn):
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
Expand Down
3 changes: 3 additions & 0 deletions synapse/storage/data_stores/main/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ def _update_revision_txn(self, txn, user_id, room_id, next_id):
self._account_data_stream_cache.entity_has_changed, user_id, next_id
)

# Note: This is only here for backwards compat to allow admins to
# roll back to a previous Synapse version. Next time we update the
# database version we can remove this table.
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
# schema files, so the users will be informed on server restarts.
# XXX: If you're about to bump this to 59 (or higher) please create an update
# that drops the unused `cache_invalidation_stream` table, as per #7436!
# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656!
SCHEMA_VERSION = 58

dir_path = os.path.abspath(os.path.dirname(__file__))
Expand Down

0 comments on commit 664409b

Please sign in to comment.