Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Refactor backfilled into specific behavior function arguments (`_pe…
Browse files Browse the repository at this point in the history
…rsist_events_and_state_updates`) (#11417)

Part of #11300

Call stack:

 - `_persist_events_and_state_updates` (added `use_negative_stream_ordering`)
    - `_persist_events_txn`
       - `_update_room_depths_txn` (added `update_room_forward_stream_ordering`)
       - `_update_metadata_tables_txn`
          - `_store_room_members_txn` (added `inhibit_local_membership_updates`)

Using keyword-only arguments (`*`) to reduce the mistakes from `backfilled` being left as a positional argument somewhere and being interpreted wrong by our new arguments.
  • Loading branch information
MadLittleMods authored Nov 29, 2021
1 parent a4521ce commit fb58611
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 21 deletions.
1 change: 1 addition & 0 deletions changelog.d/11417.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `backfilled` into specific behavior function arguments (`_persist_events_and_state_updates` and downstream calls).
74 changes: 54 additions & 20 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ def __init__(
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
*,
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False,
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
) -> None:
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
Expand All @@ -140,7 +142,14 @@ async def _persist_events_and_state_updates(
room state
new_forward_extremities: Map from room_id to list of event IDs
that are the new forward extremities of the room.
backfilled
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
Returns:
Resolves when the events have been persisted
Expand All @@ -162,7 +171,7 @@ async def _persist_events_and_state_updates(
#
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
if backfilled:
if use_negative_stream_ordering:
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(events_and_contexts)
)
Expand All @@ -179,13 +188,13 @@ async def _persist_events_and_state_updates(
"persist_events",
self._persist_events_txn,
events_and_contexts=events_and_contexts,
backfilled=backfilled,
inhibit_local_membership_updates=inhibit_local_membership_updates,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc(len(events_and_contexts))

if not backfilled:
if stream < 0:
# backfilled events have negative stream orderings, so we don't
# want to set the event_persisted_position to that.
synapse.metrics.event_persisted_position.set(
Expand Down Expand Up @@ -319,8 +328,9 @@ def _get_prevs_before_rejected_txn(txn, batch):
def _persist_events_txn(
self,
txn: LoggingTransaction,
*,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
inhibit_local_membership_updates: bool = False,
state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
):
Expand All @@ -333,7 +343,10 @@ def _persist_events_txn(
Args:
txn
events_and_contexts: events to persist
backfilled: True if the events were backfilled
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to
IntegrityError.
Expand Down Expand Up @@ -366,9 +379,7 @@ def _persist_events_txn(
events_and_contexts
)

self._update_room_depths_txn(
txn, events_and_contexts=events_and_contexts, backfilled=backfilled
)
self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts)

# _update_outliers_txn filters out any events which have already been
# persisted, and returns the filtered list.
Expand Down Expand Up @@ -401,7 +412,7 @@ def _persist_events_txn(
txn,
events_and_contexts=events_and_contexts,
all_events_and_contexts=all_events_and_contexts,
backfilled=backfilled,
inhibit_local_membership_updates=inhibit_local_membership_updates,
)

# We call this last as it assumes we've inserted the events into
Expand Down Expand Up @@ -1203,21 +1214,25 @@ def _update_room_depths_txn(
self,
txn,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
):
"""Update min_depth for each room
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
backfilled (bool): True if the events were backfilled
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
if not backfilled:
# Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
# stream_ordering and happened in the past so we know that we don't
# need to update the stream_ordering tip/front for the room.
assert event.internal_metadata.stream_ordering is not None
if event.internal_metadata.stream_ordering >= 0:
txn.call_after(
self.store._events_stream_cache.entity_has_changed,
event.room_id,
Expand Down Expand Up @@ -1430,7 +1445,12 @@ def _store_rejected_events_txn(self, txn, events_and_contexts):
return [ec for ec in events_and_contexts if ec[0] not in to_remove]

def _update_metadata_tables_txn(
self, txn, events_and_contexts, all_events_and_contexts, backfilled
self,
txn,
*,
events_and_contexts,
all_events_and_contexts,
inhibit_local_membership_updates: bool = False,
):
"""Update all the miscellaneous tables for new events
Expand All @@ -1442,7 +1462,10 @@ def _update_metadata_tables_txn(
events that we were going to persist. This includes events
we've already persisted, etc, that wouldn't appear in
events_and_context.
backfilled (bool): True if the events were backfilled
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
"""

# Insert all the push actions into the event_push_actions table.
Expand Down Expand Up @@ -1516,7 +1539,7 @@ def _update_metadata_tables_txn(
for event, _ in events_and_contexts
if event.type == EventTypes.Member
],
backfilled=backfilled,
inhibit_local_membership_updates=inhibit_local_membership_updates,
)

# Insert event_reference_hashes table.
Expand Down Expand Up @@ -1643,8 +1666,19 @@ def _store_event_reference_hashes_txn(self, txn, events):
txn, table="event_reference_hashes", values=vals
)

def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database."""
def _store_room_members_txn(
self, txn, events, *, inhibit_local_membership_updates: bool = False
):
"""
Store a room member in the database.
Args:
txn: The transaction to use.
events: List of events to store.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
"""

def non_null_str_or_none(val: Any) -> Optional[str]:
return val if isinstance(val, str) and "\u0000" not in val else None
Expand Down Expand Up @@ -1687,7 +1721,7 @@ def non_null_str_or_none(val: Any) -> Optional[str]:
# band membership", like a remote invite or a rejection of a remote invite.
if (
self.is_mine_id(event.state_key)
and not backfilled
and not inhibit_local_membership_updates
and event.internal_metadata.is_outlier()
and event.internal_metadata.is_out_of_band_membership()
):
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ async def _persist_event_batch(
current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
backfilled=backfilled,
use_negative_stream_ordering=backfilled,
inhibit_local_membership_updates=backfilled,
)

await self._handle_potentially_left_users(potentially_left_users)
Expand Down

0 comments on commit fb58611

Please sign in to comment.