Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Optimize backfill receiving to have less missing prev_event thrashing
Browse files Browse the repository at this point in the history
Pulled from scratch changes in,
#13864
  • Loading branch information
MadLittleMods committed Sep 30, 2022
1 parent 6f0c3e6 commit 12c15b1
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 3 deletions.
68 changes: 66 additions & 2 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
Expand Down Expand Up @@ -644,9 +645,70 @@ async def backfill(
f"room {ev.room_id}, when we were backfilling in {room_id}"
)

# We expect the events from the `/backfill` response to start from
# `?v` and include events that preceded it (so the list will be
# newest -> oldest, reverse-chronological). It's described in the
# spec this way so we can rely on people doing it the right way for
# the historical messages to show up correctly.
reverse_chronological_events = events
# `[::-1]` is just syntax to reverse the list and give us a copy
chronological_events = reverse_chronological_events[::-1]

# We want to calculate the `stream_ordering` from newest -> oldest
# (reverse-chronological) (so historical events end up sorting in
# the correct order) and persist oldest -> newest (chronological) to
# get the least missing `prev_event` fetch thrashing.
# ------------------------------------------------------------------

# Since we have been configured to write, we ought to have id generators,
# rather than id trackers.
assert (
self._instance_name in self._config.worker.writers.events
), "Can only write stream IDs on master"
assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator)
stream_ordering_manager = self._store._backfill_id_gen.get_next_mult(
len(reverse_chronological_events)
)
async with stream_ordering_manager as stream_orderings:
# Calculate the `stream_ordering` from newest -> oldest
# (reverse-chronological) (so historical events end up sorting
# in the correct order).
#
# Backfilled events start with `stream_ordering=-1` and
# decrement. For events, that we backfill at the same `depth`
# (like chains of historical messages) in order for them to have
# the best chance of ending up in the correct order, assign
# `stream_ordering` to the assumed reverse-chronological list of
# events to backfill (where the newest events get
# stream_ordering assigned first)
#
# depth : stream_ordering : event
# ----- : --------------- : -----------------------
# 1 : 1 : Event before 1
# 2 : 2 : Event before 2
# 3 : -4 : Historical message 1
# 3 : -4 : Historical message 2
# 3 : -3 : Historical message 3
# 3 : -2 : Historical message 4
# 3 : -1 : Historical message 5
# 3 : 3 : Event after 1
# 4 : 4 : Event after 2
#
for event, stream in zip(
reverse_chronological_events, stream_orderings
):
event.internal_metadata.stream_ordering = stream

await self._process_pulled_events(
dest,
events,
# Persist events from oldest -> newest (chronological) to get
# the least missing `prev_event` fetch thrashing.
# `_process_pulled_events` does some sorting of its own by
# `depth` but if we let it sort the reverse-chronological list
# of events, it naively orders events with the same depth in the
# opposite order we want. If we pass it an already sorted by
# depth list, then everything lines up.
chronological_events,
backfilled=True,
)

Expand Down Expand Up @@ -794,7 +856,9 @@ async def _process_pulled_events(

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)
sorted_events = sorted(
events, key=lambda x: (x.depth, x.internal_metadata.stream_ordering or 0)
)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,11 @@ async def _persist_events_and_state_updates(

async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
# If someone has already decided the stream_ordering for the
# event before, then just use that. This is done during backfill
# to help ordering of MSC2716 historical messages.
if event.internal_metadata.stream_ordering is None:
event.internal_metadata.stream_ordering = stream

await self.db_pool.runInteraction(
"persist_events",
Expand Down

0 comments on commit 12c15b1

Please sign in to comment.