Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Expand comments around different replication process methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Fizzadar committed Jan 3, 2023
1 parent 427972f commit 537cb40
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
2 changes: 2 additions & 0 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ async def on_rdata(
rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row.
"""
self.store.process_replication_rows(stream_name, instance_name, token, rows)
# NOTE: this must be called after process_replication_rows to ensure any
# cache invalidations are first handled before any stream ID advances.
self.store.process_replication_position(stream_name, instance_name, token)

if self.send_handler:
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def process_replication_rows( # noqa: B027 (no-op by design)
rows: Iterable[Any],
) -> None:
"""
Used by storage classes to invalidate caches based on incoming replication data.
Used by storage classes to invalidate caches based on incoming replication data. These
must not update any ID generators, use `process_replication_position`.
"""

def process_replication_position( # noqa: B027 (no-op by design)
Expand Down

0 comments on commit 537cb40

Please sign in to comment.