Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve opentracing support for ResponseCache #11607

Merged
merged 9 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11607.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve opentracing support for requests which use a `ResponseCache`.
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def upgrade_room(
user_id = requester.user.to_string()

# Check if this room is already being upgraded by another person
for key in self._upgrade_response_cache.pending_result_cache:
for key in self._upgrade_response_cache.keys():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only reference to ResponseCache.pending_result_cache outside ResponseCache itself.

if key[0] == old_room_id and key[1] != user_id:
# Two different people are trying to upgrade the same room.
# Send the second an error.
Expand Down
17 changes: 16 additions & 1 deletion synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import collections
import inspect
import itertools
Expand Down Expand Up @@ -55,7 +56,21 @@
_T = TypeVar("_T")


class ObservableDeferred(Generic[_T]):
class AbstractObservableDeferred(Generic[_T], metaclass=abc.ABCMeta):
"""Abstract base class defining the consumer interface of ObservableDeferred"""

@abc.abstractmethod
def observe(self) -> "defer.Deferred[_T]":
"""Add a new observer for this ObservableDeferred

This returns a brand new deferred that is resolved when the underlying
deferred is resolved. Interacting with the returned deferred does not
effect the underlying deferred.
"""
...


class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
"""Wraps a deferred object so that we can add observer deferreds. These
observer deferreds do not affect the callback chain of the original
deferred.
Expand Down
111 changes: 77 additions & 34 deletions synapse/util/caches/response_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Awaitable, Callable, Dict, Generic, Optional, TypeVar
from typing import Any, Awaitable, Callable, Dict, Generic, Iterable, Optional, TypeVar

import attr

from twisted.internet import defer

from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import (
start_active_span,
start_active_span_follows_from,
)
from synapse.util import Clock
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred
from synapse.util.caches import register_cache

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -54,6 +58,20 @@ class ResponseCacheContext(Generic[KV]):
"""


@attr.s(auto_attribs=True)
class ResponseCacheEntry:
result: AbstractObservableDeferred
"""The (possibly incomplete) result of the operation.

Note that we continue to store an ObservableDeferred even after the operation
completes (rather than switching to an immediate value), since that makes it
easier to cache Failure results.
"""

opentracing_span_context: Any
richvdh marked this conversation as resolved.
Show resolved Hide resolved
"""The opentracing span which generated/is generating the result"""


class ResponseCache(Generic[KV]):
"""
This caches a deferred response. Until the deferred completes it will be
Expand All @@ -63,10 +81,7 @@ class ResponseCache(Generic[KV]):
"""

def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
# This is poorly-named: it includes both complete and incomplete results.
# We keep complete results rather than switching to absolute values because
# that makes it easier to cache Failure results.
self.pending_result_cache: Dict[KV, ObservableDeferred] = {}
self._result_cache: Dict[KV, ResponseCacheEntry] = {}
Comment on lines -66 to +98
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we're changing its type (so have to change all references), we might as well fix the name while we're here.


self.clock = clock
self.timeout_sec = timeout_ms / 1000.0
Expand All @@ -75,75 +90,82 @@ def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
self._metrics = register_cache("response_cache", name, self, resizable=False)

def size(self) -> int:
return len(self.pending_result_cache)
return len(self._result_cache)

def __len__(self) -> int:
return self.size()

def get(self, key: KV) -> Optional[defer.Deferred]:
"""Look up the given key.
def keys(self) -> Iterable[KV]:
"""Get the keys currently in the result cache

Returns a new Deferred (which also doesn't follow the synapse
logcontext rules). You will probably want to make_deferred_yieldable the result.
Returns both incomplete entries, and (if the timeout on this cache is non-zero),
complete entries which are still in the cache.

Note that the returned iterator is not safe in the face of concurrent execution:
behaviour is undefined if `wrap` is called during iteration.
"""
return self._result_cache.keys()

If there is no entry for the key, returns None.
def _get(self, key: KV) -> Optional[ResponseCacheEntry]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get was unused outside wrap.

"""Look up the given key.

Args:
key: key to get/set in the cache
key: key to get in the cache

Returns:
None if there is no entry for this key; otherwise a deferred which
resolves to the result.
The entry for this key, if any; else None.
"""
result = self.pending_result_cache.get(key)
if result is not None:
entry = self._result_cache.get(key)
if entry is not None:
self._metrics.inc_hits()
return result.observe()
return entry
else:
self._metrics.inc_misses()
return None

def _set(
self, context: ResponseCacheContext[KV], deferred: "defer.Deferred[RV]"
) -> "defer.Deferred[RV]":
self,
context: ResponseCacheContext[KV],
deferred: "defer.Deferred[RV]",
opentracing_span_context: Any,
) -> ResponseCacheEntry:
"""Set the entry for the given key to the given deferred.

*deferred* should run its callbacks in the sentinel logcontext (ie,
you should wrap normal synapse deferreds with
synapse.logging.context.run_in_background).

Returns a new Deferred (which also doesn't follow the synapse logcontext rules).
You will probably want to make_deferred_yieldable the result.
richvdh marked this conversation as resolved.
Show resolved Hide resolved

Args:
context: Information about the cache miss
deferred: The deferred which resolves to the result.
opentracing_span_context: An opentracing span wrapping the calculation

Returns:
A new deferred which resolves to the actual result.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
"""
result = ObservableDeferred(deferred, consumeErrors=True)
key = context.cache_key
self.pending_result_cache[key] = result
entry = ResponseCacheEntry(result, opentracing_span_context)
self._result_cache[key] = entry

def on_complete(r: RV) -> RV:
# if this cache has a non-zero timeout, and the callback has not cleared
# the should_cache bit, we leave it in the cache for now and schedule
# its removal later.
if self.timeout_sec and context.should_cache:
self.clock.call_later(
self.timeout_sec, self.pending_result_cache.pop, key, None
self.timeout_sec, self._result_cache.pop, key, None
)
else:
# otherwise, remove the result immediately.
self.pending_result_cache.pop(key, None)
self._result_cache.pop(key, None)
return r

# make sure we do this *after* adding the entry to pending_result_cache,
# make sure we do this *after* adding the entry to result_cache,
# in case the result is already complete (in which case flipping the order would
# leave us with a stuck entry in the cache).
result.addBoth(on_complete)
return result.observe()
return entry

async def wrap(
self,
Expand Down Expand Up @@ -189,20 +211,41 @@ async def handle_request(request):
Returns:
The result of the callback (from the cache, or otherwise)
"""
result = self.get(key)
if not result:
entry = self._get(key)
if not entry:
logger.debug(
"[%s]: no cached result for [%s], calculating new one", self._name, key
)
context = ResponseCacheContext(cache_key=key)
if cache_context:
kwargs["cache_context"] = context
d = run_in_background(callback, *args, **kwargs)
result = self._set(context, d)
elif not isinstance(result, defer.Deferred) or result.called:

span_context = None

async def cb() -> RV:
# NB it is important that we do not `await` before setting span_context!
nonlocal span_context
with start_active_span(
f"ResponseCache[{self._name}].calculate"
) as scope:
if scope:
span_context = scope.span.context
return await callback(*args, **kwargs)

d = run_in_background(cb)
entry = self._set(context, d, span_context)
return await make_deferred_yieldable(entry.result.observe())

result = entry.result.observe()
if result.called:
logger.info("[%s]: using completed cached result for [%s]", self._name, key)
else:
logger.info(
"[%s]: using incomplete cached result for [%s]", self._name, key
)
return await make_deferred_yieldable(result)

with start_active_span_follows_from(
f"ResponseCache[{self._name}].wait",
contexts=[entry.opentracing_span_context],
):
return await make_deferred_yieldable(result)