Skip to content

Commit

Permalink
Properly purge state groups tables when purging a room (#18024)
Browse files Browse the repository at this point in the history
Currently purging a complex room can lead to a lot of orphaned rows left
behind in the state groups tables.
It seems it is because we are loosing track of state groups sometimes.

This change uses the `room_id` indexed column of `state_groups` table to
decide what to delete instead of doing an indirection through
`event_to_state_groups`.

Related to #3364.

### Pull Request Checklist

* [x] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
* [x] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Erik Johnston <erikj@jki.re>
  • Loading branch information
MatMaul and erikjohnston authored Jan 6, 2025
1 parent 6306de8 commit b3ba501
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 69 deletions.
1 change: 1 addition & 0 deletions changelog.d/18024.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Properly purge state groups tables when purging a room with the admin API.
4 changes: 2 additions & 2 deletions synapse/storage/controllers/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ async def purge_room(self, room_id: str) -> None:
"""Deletes all record of a room"""

with nested_logging_context(room_id):
state_groups_to_delete = await self.stores.main.purge_room(room_id)
await self.stores.state.purge_room_state(room_id, state_groups_to_delete)
await self.stores.main.purge_room(room_id)
await self.stores.state.purge_room_state(room_id)

async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
Expand Down
36 changes: 9 additions & 27 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#

import logging
from typing import Any, List, Set, Tuple, cast
from typing import Any, Set, Tuple, cast

from synapse.api.errors import SynapseError
from synapse.storage.database import LoggingTransaction
Expand Down Expand Up @@ -332,7 +332,7 @@ def _purge_history_txn(

return referenced_state_groups

async def purge_room(self, room_id: str) -> List[int]:
async def purge_room(self, room_id: str) -> None:
"""Deletes all record of a room
Args:
Expand All @@ -348,26 +348,23 @@ async def purge_room(self, room_id: str) -> List[int]:
# purge any of those rows which were added during the first.

logger.info("[purge] Starting initial main purge of [1/2]")
state_groups_to_delete = await self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"purge_room",
self._purge_room_txn,
room_id=room_id,
isolation_level=IsolationLevel.READ_COMMITTED,
)

logger.info("[purge] Starting secondary main purge of [2/2]")
state_groups_to_delete.extend(
await self.db_pool.runInteraction(
"purge_room",
self._purge_room_txn,
room_id=room_id,
),
await self.db_pool.runInteraction(
"purge_room",
self._purge_room_txn,
room_id=room_id,
)
logger.info("[purge] Done with main purge")

return state_groups_to_delete
logger.info("[purge] Done with main purge")

def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> None:
# This collides with event persistence so we cannot write new events and metadata into
# a room while deleting it or this transaction will fail.
if isinstance(self.database_engine, PostgresEngine):
Expand All @@ -381,19 +378,6 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
# take a while!
txn.execute("SET LOCAL statement_timeout = 0")

# First, fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
"""
SELECT DISTINCT state_group FROM events
INNER JOIN event_to_state_groups USING(event_id)
WHERE events.room_id = ?
""",
(room_id,),
)

state_groups = [row[0] for row in txn]

# Get all the auth chains that are referenced by events that are to be
# deleted.
txn.execute(
Expand Down Expand Up @@ -513,5 +497,3 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
# periodically anyway (https://github.com/matrix-org/synapse/issues/5888)

self._invalidate_caches_for_room_and_stream(txn, room_id)

return state_groups
60 changes: 21 additions & 39 deletions synapse/storage/databases/state/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,60 +840,42 @@ async def get_previous_state_groups(

return dict(rows)

async def purge_room_state(
self, room_id: str, state_groups_to_delete: Collection[int]
) -> None:
"""Deletes all record of a room from state tables
Args:
room_id:
state_groups_to_delete: State groups to delete
"""

logger.info("[purge] Starting state purge")
await self.db_pool.runInteraction(
async def purge_room_state(self, room_id: str) -> None:
return await self.db_pool.runInteraction(
"purge_room_state",
self._purge_room_state_txn,
room_id,
state_groups_to_delete,
)
logger.info("[purge] Done with state purge")

def _purge_room_state_txn(
self,
txn: LoggingTransaction,
room_id: str,
state_groups_to_delete: Collection[int],
) -> None:
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)

self.db_pool.simple_delete_many_txn(
txn,
table="state_groups_state",
column="state_group",
values=state_groups_to_delete,
keyvalues={},
)

# ... and the state group edges
# Delete all edges that reference a state group linked to room_id
logger.info("[purge] removing %s from state_group_edges", room_id)
txn.execute(
"""
DELETE FROM state_group_edges AS sge WHERE sge.state_group IN (
SELECT id FROM state_groups AS sg WHERE sg.room_id = ?
)""",
(room_id,),
)

self.db_pool.simple_delete_many_txn(
txn,
table="state_group_edges",
column="state_group",
values=state_groups_to_delete,
keyvalues={},
# state_groups_state table has a room_id column but no index on it, unlike state_groups,
# so we delete them by matching the room_id through the state_groups table.
logger.info("[purge] removing %s from state_groups_state", room_id)
txn.execute(
"""
DELETE FROM state_groups_state AS sgs WHERE sgs.state_group IN (
SELECT id FROM state_groups AS sg WHERE sg.room_id = ?
)""",
(room_id,),
)

# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)

self.db_pool.simple_delete_many_txn(
self.db_pool.simple_delete_txn(
txn,
table="state_groups",
column="id",
values=state_groups_to_delete,
keyvalues={},
keyvalues={"room_id": room_id},
)
2 changes: 1 addition & 1 deletion tests/rest/admin/test_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -3050,7 +3050,7 @@ def _block_room(self, room_id: str) -> None:
"pusher_throttle",
"room_account_data",
"room_tags",
# "state_groups", # Current impl leaves orphaned state groups around.
"state_groups",
"state_groups_state",
"federation_inbound_events_staging",
]

0 comments on commit b3ba501

Please sign in to comment.