Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add optional stream ID generator to the StreamChangeCache
Browse files Browse the repository at this point in the history
This will assert that any changes being flagged have tokens ahead of
the current known position. If the change is at or behind the current
token there is a race condition during which changes may be incorrectly
missed, this guards against that.
  • Loading branch information
Fizzadar committed Dec 9, 2022
1 parent 3dea38b commit 3a86f75
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,12 @@ def __init__(
"EventsRoomStreamChangeCache",
min_event_val,
prefilled_cache=event_cache_prefill,
stream_id_gen=self._stream_id_gen,
)
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache", events_max
"MembershipStreamChangeCache",
events_max,
stream_id_gen=self._stream_id_gen,
)

if hs.config.worker.run_background_tasks:
Expand Down
9 changes: 9 additions & 0 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import attr
from sortedcontainers import SortedDict

from synapse.storage.util.id_generators import AbstractStreamIdTracker
from synapse.util import caches

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -70,6 +71,7 @@ def __init__(
current_stream_pos: int,
max_size: int = 10000,
prefilled_cache: Optional[Mapping[EntityType, int]] = None,
stream_id_gen: Optional[AbstractStreamIdTracker] = None,
) -> None:
self._original_max_size: int = max_size
self._max_size = math.floor(max_size)
Expand All @@ -92,6 +94,8 @@ def __init__(
"cache", self.name, self._cache, resize_callback=self.set_cache_factor
)

self.stream_id_gen = stream_id_gen

if prefilled_cache:
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
Expand Down Expand Up @@ -271,6 +275,11 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
if stream_pos <= self._earliest_known_stream_pos:
return

# Any change being flagged must be ahead of any current token, otherwise
# we have a race condition between token position and stream change cache.
if self.stream_id_gen:
assert stream_pos > self.stream_id_gen.get_current_token()

old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
if old_pos >= stream_pos:
Expand Down

0 comments on commit 3a86f75

Please sign in to comment.