Skip to content

Commit

Permalink
Handle remote download responses with UNKNOWN_LENGTH more gracefully (
Browse files Browse the repository at this point in the history
#17439)

Prior to this PR, remote downloads which did not provide a
`content-length` were decremented from the remote download ratelimiter
at the max allowable size, leading to excessive ratelimiting - see
#17394.

This PR adds a linearizer to limit concurrent remote downloads to 6 per
IP address, and decrements remote downloads without a `content-length`
from the ratelimiter *after* the download is complete and the response
length is known.

Also adds logic to ensure that responses with a known length respect the
`max_download_size`.
  • Loading branch information
H-Shay authored Jul 16, 2024
1 parent 9e1acea commit 429ecb7
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 60 deletions.
1 change: 1 addition & 0 deletions changelog.d/17439.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Limit concurrent remote downloads to 6 per IP address, and decrement remote downloads without a content-length from the ratelimiter after the download is complete.
126 changes: 80 additions & 46 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
from synapse.util.metrics import Measure
from synapse.util.stringutils import parse_and_validate_server_name

Expand Down Expand Up @@ -475,6 +475,8 @@ def __init__(
use_proxy=True,
)

self.remote_download_linearizer = Linearizer("remote_download_linearizer", 6)

def wake_destination(self, destination: str) -> None:
"""Called when the remote server may have come back online."""

Expand Down Expand Up @@ -1486,35 +1488,44 @@ async def get_file(
)

headers = dict(response.headers.getAllRawHeaders())

expected_size = response.length
# if we don't get an expected length then use the max length

if expected_size == UNKNOWN_LENGTH:
expected_size = max_size
logger.debug(
f"File size unknown, assuming file is max allowable size: {max_size}"
)
else:
if int(expected_size) > max_size:
msg = "Requested file is too large > %r bytes" % (max_size,)
logger.warning(
"{%s} [%s] %s",
request.txn_id,
request.destination,
msg,
)
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)

read_body, _ = await download_ratelimiter.can_do_action(
requester=None,
key=ip_address,
n_actions=expected_size,
)
if not read_body:
msg = "Requested file size exceeds ratelimits"
logger.warning(
"{%s} [%s] %s",
request.txn_id,
request.destination,
msg,
read_body, _ = await download_ratelimiter.can_do_action(
requester=None,
key=ip_address,
n_actions=expected_size,
)
raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
if not read_body:
msg = "Requested file size exceeds ratelimits"
logger.warning(
"{%s} [%s] %s",
request.txn_id,
request.destination,
msg,
)
raise SynapseError(
HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
)

try:
# add a byte of headroom to max size as function errs at >=
d = read_body_with_max_size(response, output_stream, expected_size + 1)
d.addTimeout(self.default_timeout_seconds, self.reactor)
length = await make_deferred_yieldable(d)
async with self.remote_download_linearizer.queue(ip_address):
# add a byte of headroom to max size as function errs at >=
d = read_body_with_max_size(response, output_stream, expected_size + 1)
d.addTimeout(self.default_timeout_seconds, self.reactor)
length = await make_deferred_yieldable(d)
except BodyExceededMaxSize:
msg = "Requested file is too large > %r bytes" % (expected_size,)
logger.warning(
Expand Down Expand Up @@ -1560,6 +1571,13 @@ async def get_file(
request.method,
request.uri.decode("ascii"),
)

# if we didn't know the length upfront, decrement the actual size from ratelimiter
if response.length == UNKNOWN_LENGTH:
download_ratelimiter.record_action(
requester=None, key=ip_address, n_actions=length
)

return length, headers

async def federation_get_file(
Expand Down Expand Up @@ -1630,29 +1648,37 @@ async def federation_get_file(
)

headers = dict(response.headers.getAllRawHeaders())

expected_size = response.length
# if we don't get an expected length then use the max length

if expected_size == UNKNOWN_LENGTH:
expected_size = max_size
logger.debug(
f"File size unknown, assuming file is max allowable size: {max_size}"
)
else:
if int(expected_size) > max_size:
msg = "Requested file is too large > %r bytes" % (max_size,)
logger.warning(
"{%s} [%s] %s",
request.txn_id,
request.destination,
msg,
)
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)

read_body, _ = await download_ratelimiter.can_do_action(
requester=None,
key=ip_address,
n_actions=expected_size,
)
if not read_body:
msg = "Requested file size exceeds ratelimits"
logger.warning(
"{%s} [%s] %s",
request.txn_id,
request.destination,
msg,
read_body, _ = await download_ratelimiter.can_do_action(
requester=None,
key=ip_address,
n_actions=expected_size,
)
raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
if not read_body:
msg = "Requested file size exceeds ratelimits"
logger.warning(
"{%s} [%s] %s",
request.txn_id,
request.destination,
msg,
)
raise SynapseError(
HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
)

# this should be a multipart/mixed response with the boundary string in the header
try:
Expand All @@ -1672,11 +1698,12 @@ async def federation_get_file(
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg)

try:
# add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
deferred = read_multipart_response(
response, output_stream, boundary, expected_size + 1
)
deferred.addTimeout(self.default_timeout_seconds, self.reactor)
async with self.remote_download_linearizer.queue(ip_address):
# add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
deferred = read_multipart_response(
response, output_stream, boundary, expected_size + 1
)
deferred.addTimeout(self.default_timeout_seconds, self.reactor)
except BodyExceededMaxSize:
msg = "Requested file is too large > %r bytes" % (expected_size,)
logger.warning(
Expand Down Expand Up @@ -1743,6 +1770,13 @@ async def federation_get_file(
request.method,
request.uri.decode("ascii"),
)

# if we didn't know the length upfront, decrement the actual size from ratelimiter
if response.length == UNKNOWN_LENGTH:
download_ratelimiter.record_action(
requester=None, key=ip_address, n_actions=length
)

return length, headers, multipart_response.json


Expand Down
49 changes: 40 additions & 9 deletions tests/media/test_media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,13 +1057,15 @@ async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
)
assert channel.code == 200

@override_config({"remote_media_download_burst_count": "87M"})
@patch(
"synapse.http.matrixfederationclient.read_body_with_max_size",
read_body_with_max_size_30MiB,
)
def test_download_ratelimit_max_size_sub(self) -> None:
def test_download_ratelimit_unknown_length(self) -> None:
"""
Test that if no content-length is provided, the default max size is applied instead
Test that if no content-length is provided, ratelimit will still be applied after
download once length is known
"""

# mock out actually sending the request
Expand All @@ -1077,19 +1079,48 @@ async def _send_request(*args: Any, **kwargs: Any) -> IResponse:

self.client._send_request = _send_request # type: ignore

# ten requests should go through using the max size (500MB/50MB)
for i in range(10):
channel2 = self.make_request(
# 3 requests should go through (note 3rd one would technically violate ratelimit but
# is applied *after* download - the next one will be ratelimited)
for i in range(3):
channel = self.make_request(
"GET",
f"/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy{i}",
shorthand=False,
)
assert channel2.code == 200
assert channel.code == 200

# eleventh will hit ratelimit
channel3 = self.make_request(
# 4th will hit ratelimit
channel2 = self.make_request(
"GET",
"/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyx",
shorthand=False,
)
assert channel3.code == 429
assert channel2.code == 429

@override_config({"max_upload_size": "29M"})
@patch(
"synapse.http.matrixfederationclient.read_body_with_max_size",
read_body_with_max_size_30MiB,
)
def test_max_download_respected(self) -> None:
"""
Test that the max download size is enforced - note that max download size is determined
by the max_upload_size
"""

# mock out actually sending the request
async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
resp = MagicMock(spec=IResponse)
resp.code = 200
resp.length = 31457280
resp.headers = Headers({"Content-Type": ["application/octet-stream"]})
resp.phrase = b"OK"
return resp

self.client._send_request = _send_request # type: ignore

channel = self.make_request(
"GET", "/_matrix/media/v3/download/remote.org/abcd", shorthand=False
)
assert channel.code == 502
assert channel.json_body["errcode"] == "M_TOO_LARGE"
50 changes: 45 additions & 5 deletions tests/rest/client/test_media.py
Original file line number Diff line number Diff line change
Expand Up @@ -1809,13 +1809,19 @@ async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
)
assert channel.code == 200

@override_config(
{
"remote_media_download_burst_count": "87M",
}
)
@patch(
"synapse.http.matrixfederationclient.read_multipart_response",
read_multipart_response_30MiB,
)
def test_download_ratelimit_max_size_sub(self) -> None:
def test_download_ratelimit_unknown_length(self) -> None:
"""
Test that if no content-length is provided, the default max size is applied instead
Test that if no content-length is provided, ratelimiting is still applied after
media is downloaded and length is known
"""

# mock out actually sending the request
Expand All @@ -1831,8 +1837,9 @@ async def _send_request(*args: Any, **kwargs: Any) -> IResponse:

self.client._send_request = _send_request # type: ignore

# ten requests should go through using the max size (500MB/50MB)
for i in range(10):
# first 3 will go through (note that 3rd request technically violates rate limit but
# that since the ratelimiting is applied *after* download it goes through, but next one fails)
for i in range(3):
channel2 = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/remote.org/abc{i}",
Expand All @@ -1841,7 +1848,7 @@ async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
)
assert channel2.code == 200

# eleventh will hit ratelimit
# 4th will hit ratelimit
channel3 = self.make_request(
"GET",
"/_matrix/client/v1/media/download/remote.org/abcd",
Expand All @@ -1850,6 +1857,39 @@ async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
)
assert channel3.code == 429

@override_config({"max_upload_size": "29M"})
@patch(
"synapse.http.matrixfederationclient.read_multipart_response",
read_multipart_response_30MiB,
)
def test_max_download_respected(self) -> None:
"""
Test that the max download size is enforced - note that max download size is determined
by the max_upload_size
"""

# mock out actually sending the request, returns a 30MiB response
async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
resp = MagicMock(spec=IResponse)
resp.code = 200
resp.length = 31457280
resp.headers = Headers(
{"Content-Type": ["multipart/mixed; boundary=gc0p4Jq0M2Yt08jU534c0p"]}
)
resp.phrase = b"OK"
return resp

self.client._send_request = _send_request # type: ignore

channel = self.make_request(
"GET",
"/_matrix/client/v1/media/download/remote.org/abcd",
shorthand=False,
access_token=self.tok,
)
assert channel.code == 502
assert channel.json_body["errcode"] == "M_TOO_LARGE"

def test_file_download(self) -> None:
content = io.BytesIO(b"file_to_stream")
content_uri = self.get_success(
Expand Down

0 comments on commit 429ecb7

Please sign in to comment.