Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up responding to media requests #17558

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17558.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up responding to media requests.
4 changes: 1 addition & 3 deletions synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
from synapse.types import ISynapseReactor
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
from synapse.util.cancellation import is_function_cancellable
Expand Down Expand Up @@ -869,8 +868,7 @@ def encode(opentracing_span: "Optional[opentracing.Span]") -> bytes:

with start_active_span("encode_json_response"):
span = active_span()
reactor: ISynapseReactor = request.reactor # type: ignore
json_str = await defer_to_thread(reactor, encode, span)
json_str = await defer_to_thread(request.reactor, encode, span)

_write_bytes_to_request(request, json_str)

Expand Down
2 changes: 1 addition & 1 deletion synapse/http/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ def __init__(
)

self.site_tag = site_tag
self.reactor = reactor
self.reactor: ISynapseReactor = reactor

assert config.http_options is not None
proxied = config.http_options.x_forwarded
Expand Down
140 changes: 137 additions & 3 deletions synapse/media/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

import logging
import os
import threading
import urllib
from abc import ABC, abstractmethod
from types import TracebackType
from typing import (
TYPE_CHECKING,
Awaitable,
BinaryIO,
Dict,
Generator,
List,
Expand All @@ -37,15 +39,19 @@
)

import attr
from zope.interface import implementer

from twisted.internet import interfaces
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IConsumer
from twisted.protocols.basic import FileSender
from twisted.python.failure import Failure
from twisted.web.server import Request

from synapse.api.errors import Codes, cs_error
from synapse.http.server import finish_request, respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.context import defer_to_thread, make_deferred_yieldable
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.stringutils import is_ascii

Expand Down Expand Up @@ -138,7 +144,7 @@ async def respond_with_file(
add_file_headers(request, media_type, file_size, upload_name)

with open(file_path, "rb") as f:
await make_deferred_yieldable(FileSender().beginFileTransfer(f, request))
await ThreadedFileSender(request.reactor).beginFileTransfer(f, request)

finish_request(request)
else:
Expand Down Expand Up @@ -601,3 +607,131 @@ def _parseparam(s: bytes) -> Generator[bytes, None, None]:
f = s[:end]
yield f.strip()
s = s[end:]


@implementer(interfaces.IPushProducer)
class ThreadedFileSender:
"""
A producer that sends the contents of a file to a consumer, reading from the
file on a thread.

This works by spawning a loop in a threadpool that repeatedly reads from the
file and sends it to the consumer. The main thread communicates with the
loop via two `threading.Event`, which controls when to start/pause reading
and when to terminate.
"""

# How much data to read in one go.
CHUNK_SIZE = 2**14

# How long we wait for the consumer to be ready again before aborting the
# read.
TIMEOUT_SECONDS = 90.0

def __init__(self, reactor: ISynapseReactor) -> None:
self.reactor = reactor

self.file: Optional[BinaryIO] = None
self.deferred: "Deferred[None]" = Deferred()
self.consumer: Optional[interfaces.IConsumer] = None

# Signals if the thread should keep reading/sending data. Set means
# continue, clear means pause.
self.wakeup_event = threading.Event()

# Signals if the thread should terminate, e.g. because the consumer has
# gone away. Both this and `wakeup_event` should be set to terminate the
# loop (otherwise the thread will block on `wakeup_event`).
self.stop_event = threading.Event()

def beginFileTransfer(
self, file: BinaryIO, consumer: interfaces.IConsumer
) -> "Deferred[None]":
"""
Begin transferring a file
"""
self.file = file
self.consumer = consumer

self.consumer.registerProducer(self, True)

# We set the wakeup signal as we should start producing immediately.
self.wakeup_event.set()
defer_to_thread(self.reactor, self._on_thread_read_loop)

return make_deferred_yieldable(self.deferred)

def resumeProducing(self) -> None:
"""interfaces.IPushProducer"""
self.wakeup_event.set()

def pauseProducing(self) -> None:
"""interfaces.IPushProducer"""
self.wakeup_event.clear()

def stopProducing(self) -> None:
"""interfaces.IPushProducer"""

# Terminate the thread loop.
self.wakeup_event.set()
self.stop_event.set()

if not self.deferred.called:
self.deferred.errback(Exception("Consumer asked us to stop producing"))

def _on_thread_read_loop(self) -> None:
"""This is the loop that happens on a thread."""

try:
while not self.stop_event.is_set():
# We wait for the producer to signal that the consumer wants
# more data (or we should abort)
if not self.wakeup_event.is_set():
ret = self.wakeup_event.wait(self.TIMEOUT_SECONDS)
if not ret:
raise Exception("Timed out waiting to resume")

# Check if we were woken up so that we abort the download
if self.stop_event.is_set():
return

# The file should always have been set before we get here.
assert self.file is not None

chunk = self.file.read(self.CHUNK_SIZE)
if not chunk:
return

self.reactor.callFromThread(self._write, chunk)

except Exception:
self.reactor.callFromThread(self._error, Failure())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log the exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be logged later when we log the error from the deferred, so am cautious about adding duplicate log lines

finally:
self.reactor.callFromThread(self._finish)

def _write(self, chunk: bytes) -> None:
"""Called from the thread to write a chunk of data"""
if self.consumer:
self.consumer.write(chunk)

def _error(self, failure: Failure) -> None:
"""Called from the thread when there was a fatal error"""
if self.consumer:
self.consumer.unregisterProducer()
self.consumer = None

if not self.deferred.called:
self.deferred.errback(failure)

def _finish(self) -> None:
"""Called from the thread when it finishes (either on success or
failure)."""
if self.file:
self.file.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do self.file = None as well? As we check whether self.file is None before reading from it in _on_thread_read_loop.


if self.consumer:
self.consumer.unregisterProducer()
self.consumer = None

if not self.deferred.called:
self.deferred.callback(None)
19 changes: 8 additions & 11 deletions synapse/media/media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,15 @@
from twisted.internet import interfaces
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IConsumer
from twisted.protocols.basic import FileSender

from synapse.api.errors import NotFoundError
from synapse.logging.context import (
defer_to_thread,
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.context import defer_to_thread, run_in_background
from synapse.logging.opentracing import start_active_span, trace, trace_with_opname
from synapse.media._base import ThreadedFileSender
from synapse.util import Clock
from synapse.util.file_consumer import BackgroundFileConsumer

from ..types import JsonDict
from ..types import ISynapseReactor, JsonDict
from ._base import FileInfo, Responder
from .filepath import MediaFilePaths

Expand Down Expand Up @@ -213,7 +209,7 @@ async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
local_path = os.path.join(self.local_media_directory, path)
if os.path.exists(local_path):
logger.debug("responding with local file %s", local_path)
return FileResponder(open(local_path, "rb"))
return FileResponder(self.reactor, open(local_path, "rb"))
logger.debug("local file %s did not exist", local_path)

for provider in self.storage_providers:
Expand Down Expand Up @@ -336,12 +332,13 @@ class FileResponder(Responder):
is closed when finished streaming.
"""

def __init__(self, open_file: IO):
def __init__(self, reactor: ISynapseReactor, open_file: BinaryIO):
self.reactor = reactor
self.open_file = open_file

def write_to_consumer(self, consumer: IConsumer) -> Deferred:
return make_deferred_yieldable(
FileSender().beginFileTransfer(self.open_file, consumer)
return ThreadedFileSender(self.reactor).beginFileTransfer(
self.open_file, consumer
)

def __exit__(
Expand Down
5 changes: 3 additions & 2 deletions synapse/media/storage_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class FileStorageProviderBackend(StorageProvider):

def __init__(self, hs: "HomeServer", config: str):
self.hs = hs
self.reactor = hs.get_reactor()
self.cache_directory = hs.config.media.media_store_path
self.base_directory = config

Expand All @@ -165,7 +166,7 @@ async def store_file(self, path: str, file_info: FileInfo) -> None:
shutil_copyfile: Callable[[str, str], str] = shutil.copyfile
with start_active_span("shutil_copyfile"):
await defer_to_thread(
self.hs.get_reactor(),
self.reactor,
shutil_copyfile,
primary_fname,
backup_fname,
Expand All @@ -177,7 +178,7 @@ async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:

backup_fname = os.path.join(self.base_directory, path)
if os.path.isfile(backup_fname):
return FileResponder(open(backup_fname, "rb"))
return FileResponder(self.reactor, open(backup_fname, "rb"))

return None

Expand Down
3 changes: 2 additions & 1 deletion synapse/media/thumbnailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def __init__(
media_storage: MediaStorage,
):
self.hs = hs
self.reactor = hs.get_reactor()
self.media_repo = media_repo
self.media_storage = media_storage
self.store = hs.get_datastores().main
Expand Down Expand Up @@ -373,7 +374,7 @@ async def select_or_generate_local_thumbnail(
await respond_with_multipart_responder(
self.hs.get_clock(),
request,
FileResponder(open(file_path, "rb")),
FileResponder(self.reactor, open(file_path, "rb")),
media_info,
)
else:
Expand Down
Loading