From 1f6e744a2b14856a2550fb28fdb83e3874d5f141 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 20 Dec 2021 12:33:25 +0000 Subject: [PATCH 1/9] Add a `ResponseCache.keys` method ... so that we don't have to gut-wrench into the object. --- synapse/handlers/room.py | 2 +- synapse/util/caches/response_cache.py | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ead2198e14fe..b9c1cbffa5c5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -172,7 +172,7 @@ async def upgrade_room( user_id = requester.user.to_string() # Check if this room is already being upgraded by another person - for key in self._upgrade_response_cache.pending_result_cache: + for key in self._upgrade_response_cache.keys(): if key[0] == old_room_id and key[1] != user_id: # Two different people are trying to upgrade the same room. # Send the second an error. diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 88ccf443377c..e61dcaeba516 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Awaitable, Callable, Dict, Generic, Optional, TypeVar +from typing import Any, Awaitable, Callable, Dict, Generic, Iterable, Optional, TypeVar import attr @@ -80,6 +80,17 @@ def size(self) -> int: def __len__(self) -> int: return self.size() + def keys(self) -> Iterable[KV]: + """Get the keys currently in the result cache + + Returns both incomplete entries, and (if the timeout on this cache is non-zero), + complete entries which are still in the cache. + + Note that the returned iterator is not safe in the face of concurrent execution: + behaviour is undefined if `wrap` is called during iteration. + """ + return self.pending_result_cache.keys() + def get(self, key: KV) -> Optional[defer.Deferred]: """Look up the given key. From e51c966b067d0e2b6253cd3fa00f4955ceb956dc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Dec 2021 17:17:57 +0000 Subject: [PATCH 2/9] Wrap ResponseCache entries in an attrs class --- synapse/util/async_helpers.py | 17 ++++++- synapse/util/caches/response_cache.py | 67 ++++++++++++++------------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 20ce294209ad..ecd42448839b 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc import collections import inspect import itertools @@ -55,7 +56,21 @@ _T = TypeVar("_T") -class ObservableDeferred(Generic[_T]): +class AbstractObservableDeferred(Generic[_T], metaclass=abc.ABCMeta): + """Abstract base class defining the consumer interface of ObservableDeferred""" + + @abc.abstractmethod + def observe(self) -> "defer.Deferred[_T]": + """Add a new observer for this ObservableDeferred + + This returns a brand new deferred that is resolved when the underlying + deferred is resolved. Interacting with the returned deferred does not + effect the underlying deferred. + """ + ... + + +class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]): """Wraps a deferred object so that we can add observer deferreds. These observer deferreds do not affect the callback chain of the original deferred. diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index e61dcaeba516..27188e0ab7ad 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -20,7 +20,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.util import Clock -from synapse.util.async_helpers import ObservableDeferred +from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred from synapse.util.caches import register_cache logger = logging.getLogger(__name__) @@ -54,6 +54,17 @@ class ResponseCacheContext(Generic[KV]): """ +@attr.s(auto_attribs=True) +class ResponseCacheEntry: + result: AbstractObservableDeferred + """The (possibly incomplete) result of the operation. + + Note that we continue to store an ObservableDeferred even after the operation + completes (rather than switching to an immediate value), since that makes it + easier to cache Failure results. + """ + + class ResponseCache(Generic[KV]): """ This caches a deferred response. Until the deferred completes it will be @@ -63,10 +74,7 @@ class ResponseCache(Generic[KV]): """ def __init__(self, clock: Clock, name: str, timeout_ms: float = 0): - # This is poorly-named: it includes both complete and incomplete results. - # We keep complete results rather than switching to absolute values because - # that makes it easier to cache Failure results. - self.pending_result_cache: Dict[KV, ObservableDeferred] = {} + self._result_cache: Dict[KV, ResponseCacheEntry] = {} self.clock = clock self.timeout_sec = timeout_ms / 1000.0 @@ -75,7 +83,7 @@ def __init__(self, clock: Clock, name: str, timeout_ms: float = 0): self._metrics = register_cache("response_cache", name, self, resizable=False) def size(self) -> int: - return len(self.pending_result_cache) + return len(self._result_cache) def __len__(self) -> int: return self.size() @@ -89,43 +97,34 @@ def keys(self) -> Iterable[KV]: Note that the returned iterator is not safe in the face of concurrent execution: behaviour is undefined if `wrap` is called during iteration. """ - return self.pending_result_cache.keys() + return self._result_cache.keys() - def get(self, key: KV) -> Optional[defer.Deferred]: + def _get(self, key: KV) -> Optional[ResponseCacheEntry]: """Look up the given key. - Returns a new Deferred (which also doesn't follow the synapse - logcontext rules). You will probably want to make_deferred_yieldable the result. - - If there is no entry for the key, returns None. - Args: - key: key to get/set in the cache + key: key to get in the cache Returns: - None if there is no entry for this key; otherwise a deferred which - resolves to the result. + The entry for this key, if any; else None. """ - result = self.pending_result_cache.get(key) - if result is not None: + entry = self._result_cache.get(key) + if entry is not None: self._metrics.inc_hits() - return result.observe() + return entry else: self._metrics.inc_misses() return None def _set( self, context: ResponseCacheContext[KV], deferred: "defer.Deferred[RV]" - ) -> "defer.Deferred[RV]": + ) -> ResponseCacheEntry: """Set the entry for the given key to the given deferred. *deferred* should run its callbacks in the sentinel logcontext (ie, you should wrap normal synapse deferreds with synapse.logging.context.run_in_background). - Returns a new Deferred (which also doesn't follow the synapse logcontext rules). - You will probably want to make_deferred_yieldable the result. - Args: context: Information about the cache miss deferred: The deferred which resolves to the result. @@ -135,7 +134,8 @@ def _set( """ result = ObservableDeferred(deferred, consumeErrors=True) key = context.cache_key - self.pending_result_cache[key] = result + entry = ResponseCacheEntry(result) + self._result_cache[key] = entry def on_complete(r: RV) -> RV: # if this cache has a non-zero timeout, and the callback has not cleared @@ -143,18 +143,18 @@ def on_complete(r: RV) -> RV: # its removal later. if self.timeout_sec and context.should_cache: self.clock.call_later( - self.timeout_sec, self.pending_result_cache.pop, key, None + self.timeout_sec, self._result_cache.pop, key, None ) else: # otherwise, remove the result immediately. - self.pending_result_cache.pop(key, None) + self._result_cache.pop(key, None) return r - # make sure we do this *after* adding the entry to pending_result_cache, + # make sure we do this *after* adding the entry to result_cache, # in case the result is already complete (in which case flipping the order would # leave us with a stuck entry in the cache). result.addBoth(on_complete) - return result.observe() + return entry async def wrap( self, @@ -200,8 +200,8 @@ async def handle_request(request): Returns: The result of the callback (from the cache, or otherwise) """ - result = self.get(key) - if not result: + entry = self._get(key) + if not entry: logger.debug( "[%s]: no cached result for [%s], calculating new one", self._name, key ) @@ -209,8 +209,11 @@ async def handle_request(request): if cache_context: kwargs["cache_context"] = context d = run_in_background(callback, *args, **kwargs) - result = self._set(context, d) - elif not isinstance(result, defer.Deferred) or result.called: + entry = self._set(context, d) + return await make_deferred_yieldable(entry.result.observe()) + + result = entry.result.observe() + if result.called: logger.info("[%s]: using completed cached result for [%s]", self._name, key) else: logger.info( From 0d1dcb9a4764768a2f3e9a90b4e05af6d847ce1d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Dec 2021 17:55:08 +0000 Subject: [PATCH 3/9] Wrap ResponseCache operations in opentracing spans --- synapse/util/caches/response_cache.py | 39 +++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 27188e0ab7ad..9f29f3a835a6 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -19,6 +19,10 @@ from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.opentracing import ( + start_active_span, + start_active_span_follows_from, +) from synapse.util import Clock from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred from synapse.util.caches import register_cache @@ -64,6 +68,9 @@ class ResponseCacheEntry: easier to cache Failure results. """ + opentracing_span_context: Any + """The opentracing span which generated/is generating the result""" + class ResponseCache(Generic[KV]): """ @@ -117,7 +124,10 @@ def _get(self, key: KV) -> Optional[ResponseCacheEntry]: return None def _set( - self, context: ResponseCacheContext[KV], deferred: "defer.Deferred[RV]" + self, + context: ResponseCacheContext[KV], + deferred: "defer.Deferred[RV]", + opentracing_span_context: Any, ) -> ResponseCacheEntry: """Set the entry for the given key to the given deferred. @@ -128,13 +138,14 @@ def _set( Args: context: Information about the cache miss deferred: The deferred which resolves to the result. + opentracing_span_context: An opentracing span wrapping the calculation Returns: A new deferred which resolves to the actual result. """ result = ObservableDeferred(deferred, consumeErrors=True) key = context.cache_key - entry = ResponseCacheEntry(result) + entry = ResponseCacheEntry(result, opentracing_span_context) self._result_cache[key] = entry def on_complete(r: RV) -> RV: @@ -208,8 +219,21 @@ async def handle_request(request): context = ResponseCacheContext(cache_key=key) if cache_context: kwargs["cache_context"] = context - d = run_in_background(callback, *args, **kwargs) - entry = self._set(context, d) + + span_context = None + + async def cb() -> RV: + # NB it is important that we do not `await` before setting span_context! + nonlocal span_context + with start_active_span( + f"ResponseCache[{self._name}].calculate" + ) as scope: + if scope: + span_context = scope.span.context + return await callback(*args, **kwargs) + + d = run_in_background(cb) + entry = self._set(context, d, span_context) return await make_deferred_yieldable(entry.result.observe()) result = entry.result.observe() @@ -219,4 +243,9 @@ async def handle_request(request): logger.info( "[%s]: using incomplete cached result for [%s]", self._name, key ) - return await make_deferred_yieldable(result) + + with start_active_span_follows_from( + f"ResponseCache[{self._name}].wait", + contexts=[entry.opentracing_span_context], + ): + return await make_deferred_yieldable(result) From bee20eb892411b500e397656a9336b95ef8b4371 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 20 Dec 2021 12:19:17 +0000 Subject: [PATCH 4/9] changelog --- changelog.d/11607.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11607.misc diff --git a/changelog.d/11607.misc b/changelog.d/11607.misc new file mode 100644 index 000000000000..e82f46776364 --- /dev/null +++ b/changelog.d/11607.misc @@ -0,0 +1 @@ +Improve opentracing support for requests which use a `ResponseCache`. From f71e964f8590db554c2a5f9d5a709f4130cdc00f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 20 Dec 2021 14:41:59 +0000 Subject: [PATCH 5/9] fix docstring --- synapse/util/caches/response_cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 9f29f3a835a6..3d9ae53acbec 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -141,7 +141,7 @@ def _set( opentracing_span_context: An opentracing span wrapping the calculation Returns: - A new deferred which resolves to the actual result. + The cache entry object. """ result = ObservableDeferred(deferred, consumeErrors=True) key = context.cache_key From c302d93897edf209891bee7f61d0a4e7efd68e02 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 20 Dec 2021 15:09:32 +0000 Subject: [PATCH 6/9] Fix behaviour when tracing is not enabled `start_active_span` always returns a truthy result, so we need to go further round the houses to get the span. --- synapse/util/caches/response_cache.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 3d9ae53acbec..e81d72f68889 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -20,6 +20,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import ( + active_span, start_active_span, start_active_span_follows_from, ) @@ -225,11 +226,10 @@ async def handle_request(request): async def cb() -> RV: # NB it is important that we do not `await` before setting span_context! nonlocal span_context - with start_active_span( - f"ResponseCache[{self._name}].calculate" - ) as scope: - if scope: - span_context = scope.span.context + with start_active_span(f"ResponseCache[{self._name}].calculate"): + span = active_span() + if span: + span_context = span.context return await callback(*args, **kwargs) d = run_in_background(cb) @@ -244,8 +244,9 @@ async def cb() -> RV: "[%s]: using incomplete cached result for [%s]", self._name, key ) + span_context = entry.opentracing_span_context with start_active_span_follows_from( f"ResponseCache[{self._name}].wait", - contexts=[entry.opentracing_span_context], + contexts=(span_context,) if span_context else (), ): return await make_deferred_yieldable(result) From 068ec3e7493ab5dd09491f194ab17abf9b16ff6c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 20 Dec 2021 15:11:04 +0000 Subject: [PATCH 7/9] better typing --- synapse/util/caches/response_cache.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index e81d72f68889..a3eb5f741bfc 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -12,7 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Awaitable, Callable, Dict, Generic, Iterable, Optional, TypeVar +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + Generic, + Iterable, + Optional, + TypeVar, +) import attr @@ -30,6 +40,9 @@ logger = logging.getLogger(__name__) +if TYPE_CHECKING: + import opentracing + # the type of the key in the cache KV = TypeVar("KV") @@ -69,7 +82,7 @@ class ResponseCacheEntry: easier to cache Failure results. """ - opentracing_span_context: Any + opentracing_span_context: "Optional[opentracing.SpanContext]" """The opentracing span which generated/is generating the result""" @@ -128,7 +141,7 @@ def _set( self, context: ResponseCacheContext[KV], deferred: "defer.Deferred[RV]", - opentracing_span_context: Any, + opentracing_span_context: "Optional[opentracing.SpanContext]", ) -> ResponseCacheEntry: """Set the entry for the given key to the given deferred. @@ -221,7 +234,7 @@ async def handle_request(request): if cache_context: kwargs["cache_context"] = context - span_context = None + span_context: Optional[opentracing.SpanContext] = None async def cb() -> RV: # NB it is important that we do not `await` before setting span_context! From d4033b2be6eae2eda9fd5791f7efd80f396963d2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 20 Dec 2021 15:33:57 +0000 Subject: [PATCH 8/9] Update unit tests `ResponseCache.get` is no longer part of the public interface, so let's avoid using it in the unit tests. --- tests/util/caches/test_response_cache.py | 45 ++++++++++++++++++------ 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/tests/util/caches/test_response_cache.py b/tests/util/caches/test_response_cache.py index 1e83ef2f33d5..025b73e32f90 100644 --- a/tests/util/caches/test_response_cache.py +++ b/tests/util/caches/test_response_cache.py @@ -11,6 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from unittest.mock import Mock + from parameterized import parameterized from twisted.internet import defer @@ -60,10 +63,15 @@ def test_cache_hit(self): self.successResultOf(wrap_d), "initial wrap result should be the same", ) + + # a second call should return the result without a call to the wrapped function + unexpected = Mock(spec=()) + wrap2_d = defer.ensureDeferred(cache.wrap(0, unexpected)) + unexpected.assert_not_called() self.assertEqual( expected_result, - self.successResultOf(cache.get(0)), - "cache should have the result", + self.successResultOf(wrap2_d), + "cache should still have the result", ) def test_cache_miss(self): @@ -80,7 +88,7 @@ def test_cache_miss(self): self.successResultOf(wrap_d), "initial wrap result should be the same", ) - self.assertIsNone(cache.get(0), "cache should not have the result now") + self.assertCountEqual([], cache.keys(), "cache should not have the result now") def test_cache_expire(self): cache = self.with_cache("short_cache", ms=1000) @@ -92,16 +100,20 @@ def test_cache_expire(self): ) self.assertEqual(expected_result, self.successResultOf(wrap_d)) + + # a second call should return the result without a call to the wrapped function + unexpected = Mock(spec=()) + wrap2_d = defer.ensureDeferred(cache.wrap(0, unexpected)) + unexpected.assert_not_called() self.assertEqual( expected_result, - self.successResultOf(cache.get(0)), + self.successResultOf(wrap2_d), "cache should still have the result", ) # cache eviction timer is handled self.reactor.pump((2,)) - - self.assertIsNone(cache.get(0), "cache should not have the result now") + self.assertCountEqual([], cache.keys(), "cache should not have the result now") def test_cache_wait_hit(self): cache = self.with_cache("neutral_cache") @@ -133,16 +145,21 @@ def test_cache_wait_expire(self): self.reactor.pump((1, 1)) self.assertEqual(expected_result, self.successResultOf(wrap_d)) + + # a second call should immediately return the result without a call to the + # wrapped function + unexpected = Mock(spec=()) + wrap2_d = defer.ensureDeferred(cache.wrap(0, unexpected)) + unexpected.assert_not_called() self.assertEqual( expected_result, - self.successResultOf(cache.get(0)), + self.successResultOf(wrap2_d), "cache should still have the result", ) # (1 + 1 + 2) > 3.0, cache eviction timer is handled self.reactor.pump((2,)) - - self.assertIsNone(cache.get(0), "cache should not have the result now") + self.assertCountEqual([], cache.keys(), "cache should not have the result now") @parameterized.expand([(True,), (False,)]) def test_cache_context_nocache(self, should_cache: bool): @@ -183,10 +200,16 @@ async def non_caching(o: str, cache_context: ResponseCacheContext[int]): self.assertEqual(expected_result, self.successResultOf(wrap2_d)) if should_cache: + unexpected = Mock(spec=()) + wrap3_d = defer.ensureDeferred(cache.wrap(0, unexpected)) + unexpected.assert_not_called() self.assertEqual( expected_result, - self.successResultOf(cache.get(0)), + self.successResultOf(wrap3_d), "cache should still have the result", ) + else: - self.assertIsNone(cache.get(0), "cache should not have the result") + self.assertCountEqual( + [], cache.keys(), "cache should not have the result now" + ) From 6a89434e0d2eb1f6f1e599fdfd8b30c16b056760 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 20 Dec 2021 17:43:13 +0000 Subject: [PATCH 9/9] Fixes to `AbstractObservableDeferred` 1. We need to define `__slots__`, to keep `ObservableDeferred` slotty. 2. Fix up the docstring to remind users to `make_deferred_yieldable`. --- synapse/util/async_helpers.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index ecd42448839b..1548c5fd5786 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -59,6 +59,8 @@ class AbstractObservableDeferred(Generic[_T], metaclass=abc.ABCMeta): """Abstract base class defining the consumer interface of ObservableDeferred""" + __slots__ = () + @abc.abstractmethod def observe(self) -> "defer.Deferred[_T]": """Add a new observer for this ObservableDeferred @@ -66,6 +68,9 @@ def observe(self) -> "defer.Deferred[_T]": This returns a brand new deferred that is resolved when the underlying deferred is resolved. Interacting with the returned deferred does not effect the underlying deferred. + + Note that the returned Deferred doesn't follow the Synapse logcontext rules - + you will probably want to `make_deferred_yieldable` it. """ ...