Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Allow bigger responses to /federation/v1/state (#12877)
Browse files Browse the repository at this point in the history
* Refactor HTTP response size limits

Rather than passing a separate `max_response_size` down the stack, make it an
attribute of the `parser`.

* Allow bigger responses on `federation/v1/state`

`/state` can return huge responses, so we need to handle that.
  • Loading branch information
richvdh authored May 25, 2022
1 parent 4660d9f commit 1b33847
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 32 deletions.
1 change: 1 addition & 0 deletions changelog.d/12877.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.54 which could sometimes cause exceptions when handling federated traffic.
15 changes: 8 additions & 7 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@

logger = logging.getLogger(__name__)

# Send join responses can be huge, so we set a separate limit here. The response
# is parsed in a streaming manner, which helps alleviate the issue of memory
# usage a bit.
MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024


class TransportLayerClient:
"""Sends federation HTTP requests to other servers"""
Expand Down Expand Up @@ -349,7 +344,6 @@ async def send_join_v1(
path=path,
data=content,
parser=SendJoinParser(room_version, v1_api=True),
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
)

async def send_join_v2(
Expand All @@ -372,7 +366,6 @@ async def send_join_v2(
args=query_params,
data=content,
parser=SendJoinParser(room_version, v1_api=False),
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
)

async def send_leave_v1(
Expand Down Expand Up @@ -1360,6 +1353,11 @@ class SendJoinParser(ByteParser[SendJoinResponse]):

CONTENT_TYPE = "application/json"

# /send_join responses can be huge, so we override the size limit here. The response
# is parsed in a streaming manner, which helps alleviate the issue of memory
# usage a bit.
MAX_RESPONSE_SIZE = 500 * 1024 * 1024

def __init__(self, room_version: RoomVersion, v1_api: bool):
self._response = SendJoinResponse([], [], event_dict={})
self._room_version = room_version
Expand Down Expand Up @@ -1427,6 +1425,9 @@ class _StateParser(ByteParser[StateRequestResponse]):

CONTENT_TYPE = "application/json"

# As with /send_join, /state responses can be huge.
MAX_RESPONSE_SIZE = 500 * 1024 * 1024

def __init__(self, room_version: RoomVersion):
self._response = StateRequestResponse([], [])
self._room_version = room_version
Expand Down
29 changes: 7 additions & 22 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
)

# a federation response can be rather large (eg a big state_ids is 50M or so), so we
# need a generous limit here.
MAX_RESPONSE_SIZE = 100 * 1024 * 1024

MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
Expand All @@ -116,6 +113,11 @@ class ByteParser(ByteWriteable, Generic[T], abc.ABC):
the content type doesn't match we fail the request.
"""

# a federation response can be rather large (eg a big state_ids is 50M or so), so we
# need a generous limit here.
MAX_RESPONSE_SIZE: int = 100 * 1024 * 1024
"""The largest response this parser will accept."""

@abc.abstractmethod
def finish(self) -> T:
"""Called when response has finished streaming and the parser should
Expand Down Expand Up @@ -203,7 +205,6 @@ async def _handle_response(
response: IResponse,
start_ms: int,
parser: ByteParser[T],
max_response_size: Optional[int] = None,
) -> T:
"""
Reads the body of a response with a timeout and sends it to a parser
Expand All @@ -215,15 +216,12 @@ async def _handle_response(
response: response to the request
start_ms: Timestamp when request was made
parser: The parser for the response
max_response_size: The maximum size to read from the response, if None
uses the default.
Returns:
The parsed response
"""

if max_response_size is None:
max_response_size = MAX_RESPONSE_SIZE
max_response_size = parser.MAX_RESPONSE_SIZE

try:
check_content_type_is(response.headers, parser.CONTENT_TYPE)
Expand All @@ -240,7 +238,7 @@ async def _handle_response(
"{%s} [%s] JSON response exceeded max size %i - %s %s",
request.txn_id,
request.destination,
MAX_RESPONSE_SIZE,
max_response_size,
request.method,
request.uri.decode("ascii"),
)
Expand Down Expand Up @@ -772,7 +770,6 @@ async def put_json(
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Literal[None] = None,
max_response_size: Optional[int] = None,
) -> Union[JsonDict, list]:
...

Expand All @@ -790,7 +787,6 @@ async def put_json(
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser[T]] = None,
max_response_size: Optional[int] = None,
) -> T:
...

Expand All @@ -807,7 +803,6 @@ async def put_json(
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser] = None,
max_response_size: Optional[int] = None,
):
"""Sends the specified json data using PUT
Expand Down Expand Up @@ -843,8 +838,6 @@ async def put_json(
enabled.
parser: The parser to use to decode the response. Defaults to
parsing as JSON.
max_response_size: The maximum size to read from the response, if None
uses the default.
Returns:
Succeeds when we get a 2xx HTTP response. The
Expand Down Expand Up @@ -895,7 +888,6 @@ async def put_json(
response,
start_ms,
parser=parser,
max_response_size=max_response_size,
)

return body
Expand Down Expand Up @@ -984,7 +976,6 @@ async def get_json(
ignore_backoff: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Literal[None] = None,
max_response_size: Optional[int] = None,
) -> Union[JsonDict, list]:
...

Expand All @@ -999,7 +990,6 @@ async def get_json(
ignore_backoff: bool = ...,
try_trailing_slash_on_400: bool = ...,
parser: ByteParser[T] = ...,
max_response_size: Optional[int] = ...,
) -> T:
...

Expand All @@ -1013,7 +1003,6 @@ async def get_json(
ignore_backoff: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser] = None,
max_response_size: Optional[int] = None,
):
"""GETs some json from the given host homeserver and path
Expand Down Expand Up @@ -1043,9 +1032,6 @@ async def get_json(
parser: The parser to use to decode the response. Defaults to
parsing as JSON.
max_response_size: The maximum size to read from the response. If None,
uses the default.
Returns:
Succeeds when we get a 2xx HTTP response. The
result will be the decoded JSON body.
Expand Down Expand Up @@ -1090,7 +1076,6 @@ async def get_json(
response,
start_ms,
parser=parser,
max_response_size=max_response_size,
)

return body
Expand Down
6 changes: 3 additions & 3 deletions tests/http/test_fedclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from synapse.api.errors import RequestSendFailed
from synapse.http.matrixfederationclient import (
MAX_RESPONSE_SIZE,
JsonParser,
MatrixFederationHttpClient,
MatrixFederationRequest,
)
Expand Down Expand Up @@ -609,9 +609,9 @@ def test_too_big(self):
while not test_d.called:
protocol.dataReceived(b"a" * chunk_size)
sent += chunk_size
self.assertLessEqual(sent, MAX_RESPONSE_SIZE)
self.assertLessEqual(sent, JsonParser.MAX_RESPONSE_SIZE)

self.assertEqual(sent, MAX_RESPONSE_SIZE)
self.assertEqual(sent, JsonParser.MAX_RESPONSE_SIZE)

f = self.failureResultOf(test_d)
self.assertIsInstance(f.value, RequestSendFailed)
Expand Down

0 comments on commit 1b33847

Please sign in to comment.