Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Re-use work of getting state for a given state_group (_get_state_groups_from_groups) #15617

1 change: 1 addition & 0 deletions changelog.d/15617.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make `/messages` faster by efficiently grabbing state out of database whenever we have to backfill and process new events.
90 changes: 77 additions & 13 deletions synapse/storage/databases/state/bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Set, Tuple, Union

from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
Expand Down Expand Up @@ -89,6 +89,10 @@ def _get_state_groups_from_groups_txn(
groups: List[int],
state_filter: Optional[StateFilter] = None,
) -> Mapping[int, StateMap[str]]:
"""
TODO
"""

state_filter = state_filter or StateFilter.all()

results: Dict[int, MutableStateMap[str]] = {group: {} for group in groups}
Expand All @@ -106,11 +110,22 @@ def _get_state_groups_from_groups_txn(
# This may return multiple rows per (type, state_key), but last_value
# should be the same.
sql = """
WITH RECURSIVE sgs(state_group) AS (
VALUES(?::bigint)
WITH RECURSIVE sgs(state_group, state_group_reached) AS (
VALUES(?::bigint, NULL::bigint)
UNION ALL
SELECT prev_state_group FROM state_group_edges e, sgs s
WHERE s.state_group = e.state_group
SELECT
prev_state_group,
CASE
/* Specify state_groups we have already done the work for */
WHEN @prev_state_group IN (%s) THEN prev_state_group
ELSE NULL
END AS state_group_reached
FROM
state_group_edges e, sgs s
WHERE
s.state_group = e.state_group
/* Stop when we connect up to another state_group that we already did the work for */
AND s.state_group_reached IS NULL
)
%s
"""
Expand Down Expand Up @@ -154,7 +169,7 @@ def _get_state_groups_from_groups_txn(
f"""
(
SELECT DISTINCT ON (type, state_key)
type, state_key, event_id
type, state_key, event_id, state_group
FROM state_groups_state
INNER JOIN sgs USING (state_group)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
WHERE {where_clause}
Expand All @@ -175,23 +190,71 @@ def _get_state_groups_from_groups_txn(

overall_select_clause = f"""
SELECT DISTINCT ON (type, state_key)
type, state_key, event_id
type, state_key, event_id, state_group
FROM state_groups_state
WHERE state_group IN (
SELECT state_group FROM sgs
) {where_clause}
ORDER BY type, state_key, state_group DESC
"""

for group in groups:
# We can sort from smallest to largest state_group and re-use the work from
# the small state_group for a larger one if we see that the edge chain links
# up.
sorted_groups = sorted(groups)
state_groups_we_have_already_fetched: Set[int] = set()
for group in sorted_groups:
args: List[Union[int, str]] = [group]
args.extend(overall_select_query_args)

txn.execute(sql % (overall_select_clause,), args)
state_groups_we_have_already_fetched_string = ", ".join(
[
f"{state_group}::bigint"
# We default to `[-1]` just to fill in the query with something
# that will have no effct
for state_group in state_groups_we_have_already_fetched or [-1]
]
)

txn.execute(
sql
% (
state_groups_we_have_already_fetched_string,
overall_select_clause,
),
args,
)

min_state_group: Optional[int] = None
partial_state_map_for_state_group: MutableStateMap[str] = {}
for row in txn:
typ, state_key, event_id = row
typ, state_key, event_id, state_group = row
key = (intern_string(typ), intern_string(state_key))
results[group][key] = event_id
partial_state_map_for_state_group[key] = event_id

if min_state_group is None or state_group < min_state_group:
min_state_group = state_group

# If we see a state group edge link to a previous state_group that we
# already fetched from the database, link up the base state to the
# partial state we retrieved from the database to build on top of.
if (
min_state_group is not None
and results.get(min_state_group) is not None
):
resultant_state_map = dict(results[min_state_group])
resultant_state_map.update(partial_state_map_for_state_group)

results[group] = resultant_state_map
else:
# It's also completely normal for us not to have a previous
# state_group to build on top of if this is the first group being
# processes or we are processing a bunch of groups from different
# rooms which of course will never link together.
results[group] = partial_state_map_for_state_group

state_groups_we_have_already_fetched.add(group)

Copy link
Contributor Author

@MadLittleMods MadLittleMods May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After

[...]

In the sample request above, simulating how the app would make the queries with the changes in this PR, we don't actually see that much benefit because it turns out that there isn't much state_group sharing amongst events. That 14s SQL query is just fetching the state at each of the prev_events of one event and only one seems to share state_groups.

[...]

Initially, I thought the lack of sharing was quite strange but this is because of the state_group snapshotting feature where if an event requires more than 100 hops on state_event_edges, then a Synaspe will create a new state_group with a snapshot of all of that state. It seems like this isn't done very efficiently though. Relevant docs.

And it turns out the event is an org.matrix.dummy_event which Synapse automatically puts in the DAG to resolve outstanding forward extremities and these events aren't even shown to clients so we don't even need to waste time waiting for them to backfill. Tracked by #15632

Generally, I think this PR could bring great gains in conjunction to running some sort of state compressor over the database to get a lot more sharing. In addition to trying to fix the online state_group snapshotting logic to be smarter. I don't know how the existing state_compressors work but I imagine we could create snapshots and bucket for years -> months -> weeks -> days -> hours -> individual events and create new state_group chains which utilize these from biggest to smallest to get maximal sharing.

-- "After" section of the PR description

This PR hasn't made as big of an impact as I thought it would for that type of request. Are we still interested in a change like this? It may work well for sequential events that we backfill.

It seems like our state_group sharing is realllly sub-par and the way that state_groups can only have a max of 100 hops puts an upper limit on how much gain this PR can give. I didn't anticipate that's how state_groups worked and thought it was one state_group per-state-change which it is until it starts doing snapshots.

Maybe it's more interesting to improve our state_group logic to be much smarter first and we could re-visit something like this. Or look into the state compressor stuff to optimize our backlog which would help for the Matrix Public Archive. I'm not sure if the current state compressors optimize for disk space or sharing or how inter-related those two goals are.

else:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To not complicate the diff, I've held off on applying the same treatment to SQLite.

We can iterate on this in another PR or just opt for people to use Postgres in order to see performance

max_entries_returned = state_filter.max_entries_returned()

Expand All @@ -201,8 +264,9 @@ def _get_state_groups_from_groups_txn(
if where_clause:
where_clause = " AND (%s)" % (where_clause,)

# We don't use WITH RECURSIVE on sqlite3 as there are distributions
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
# XXX: We could `WITH RECURSIVE` here since it's supported on SQLite 3.8.3
# or higher and our minimum supported version is greater than that. We just
# haven't put in the time to refactor this.
for group in groups:
next_group: Optional[int] = group

Expand Down