Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Federation outbound proxy #15773

Merged
merged 47 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
eb6e132
Proxy federation requests
erikjohnston Mar 31, 2023
6a95e7a
Make configurable
erikjohnston Apr 28, 2023
f0270aa
Cache the fed proxy
erikjohnston May 10, 2023
6d98582
Accept a list of federation proxies
erikjohnston May 10, 2023
5889396
Make configurable
erikjohnston May 10, 2023
58fe4da
Comment
erikjohnston May 10, 2023
f00fedd
Remove unused class
erikjohnston May 15, 2023
41c5747
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 13, 2023
f219f0e
Add changelog
MadLittleMods Jun 13, 2023
c998d28
Avoid negated condition
MadLittleMods Jun 13, 2023
cc05c97
Fix tests and align to new `matrix-federation://` schema
MadLittleMods Jun 13, 2023
8cfad3d
Fix lints
MadLittleMods Jun 13, 2023
9eec614
WORKER PROXY WIP
erikjohnston May 10, 2023
e9e900f
Align scheme checking
MadLittleMods Jun 14, 2023
dcb4105
Fix lints
MadLittleMods Jun 14, 2023
c6dcd5e
Refactor tests to use `get_clock()`
MadLittleMods Jun 14, 2023
f139898
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 14, 2023
8f9f478
Fix tests (make sure `federation_http_client` is defined)
MadLittleMods Jun 14, 2023
e789c64
Fix tests
MadLittleMods Jun 14, 2023
0cead40
Fix lints
MadLittleMods Jun 14, 2023
11bf041
Maybe fix more replication tests
MadLittleMods Jun 14, 2023
d847564
Mark out spots to add docs
MadLittleMods Jun 15, 2023
74988e2
WIP: Very rough worker test
MadLittleMods Jun 16, 2023
6b44e66
Cleaned up test
MadLittleMods Jun 16, 2023
8af2fb8
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 20, 2023
1abd3b1
Clean up test
MadLittleMods Jun 20, 2023
477844c
Explain why we care about catching `PotentialDataLoss`
MadLittleMods Jun 20, 2023
dac5532
Add some more context
MadLittleMods Jun 20, 2023
cf208d2
Test error case
MadLittleMods Jun 20, 2023
e665fa8
Flesh out docstrings and comments
MadLittleMods Jun 20, 2023
2ce2025
Update docs
MadLittleMods Jun 20, 2023
632544a
Add some background behind `matrix-federation://`
MadLittleMods Jun 20, 2023
033e18a
Align language
MadLittleMods Jun 20, 2023
b5e916e
Revert back to debug level
MadLittleMods Jun 20, 2023
484680f
Merge branch 'develop' into erikj/fed_proxy
erikjohnston Jun 20, 2023
2032ea6
`master`/`main` is in the `instance_map` so no need to skip checking …
MadLittleMods Jun 21, 2023
926e3e0
Remove extra proxy logging
MadLittleMods Jun 21, 2023
0a2a9cf
Do not copy over hop-by-hop headers
MadLittleMods Jun 21, 2023
c757a38
Add tests for `parse_connection_header_value`
MadLittleMods Jun 21, 2023
be12f21
Add tests to make sure headers are removed
MadLittleMods Jun 21, 2023
735203e
Ignore lint
MadLittleMods Jun 21, 2023
9e3881f
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 21, 2023
c1ec014
Fix `arg-type` lint
MadLittleMods Jun 21, 2023
d400b50
Simplify `parse_connection_header_value`
MadLittleMods Jun 21, 2023
e99a5e9
Use safe `json.dumps` for JSON response
MadLittleMods Jun 27, 2023
074fe0c
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 28, 2023
d3292d2
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15773.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow configuring the set of workers to proxy outbound federation traffic through via `outbound_federation_restricted_to`.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion contrib/lnav/synapse-log-format.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"level": "error"
},
{
"line": "my-matrix-server-federation-sender-1 | 2023-01-25 20:56:20,995 - synapse.http.matrixfederationclient - 709 - WARNING - federation_transaction_transmission_loop-3 - {PUT-O-3} [example.com] Request failed: PUT matrix://example.com/_matrix/federation/v1/send/1674680155797: HttpResponseException('403: Forbidden')",
"line": "my-matrix-server-federation-sender-1 | 2023-01-25 20:56:20,995 - synapse.http.matrixfederationclient - 709 - WARNING - federation_transaction_transmission_loop-3 - {PUT-O-3} [example.com] Request failed: PUT matrix-federation://example.com/_matrix/federation/v1/send/1674680155797: HttpResponseException('403: Forbidden')",
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
"level": "warning"
},
{
Expand Down
4 changes: 2 additions & 2 deletions scripts-dev/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ def request(
authorization_headers.append(header)
print("Authorization: %s" % header, file=sys.stderr)

dest = "matrix://%s%s" % (destination, path)
dest = "matrix-federation://%s%s" % (destination, path)
print("Requesting %s" % dest, file=sys.stderr)

s = requests.Session()
s.mount("matrix://", MatrixConnectionAdapter())
s.mount("matrix-federation://", MatrixConnectionAdapter())

headers: Dict[str, str] = {
"Authorization": authorization_headers[0],
Expand Down
36 changes: 35 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import argparse
import logging
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Optional, Union

import attr
from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
Expand Down Expand Up @@ -148,6 +148,23 @@ class WriterLocations:
)


@attr.s(auto_attribs=True)
class OutboundFederationRestrictedTo:
"""Whether we limit outbound federation to a certain set of instances.

Attributes:
instances: optional list of instances that can make outbound federation
requests. If None then all instances can make federation requests.
locations: list of instance locations to connect to proxy via.
"""

instances: Optional[List[str]]
locations: List[InstanceLocationConfig] = attr.Factory(list)

def __contains__(self, instance: str) -> bool:
return self.instances is None or instance in self.instances


class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
Expand Down Expand Up @@ -357,6 +374,23 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
new_option_name="update_user_directory_from_worker",
)

outbound_federation_restricted_to = config.get(
"outbound_federation_restricted_to", None
)
self.outbound_federation_restricted_to = OutboundFederationRestrictedTo(
outbound_federation_restricted_to
)
if outbound_federation_restricted_to:
for instance in outbound_federation_restricted_to:
if instance != "master" and instance not in self.instance_map:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
raise ConfigError(
"Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config."
% (instance,)
)
self.outbound_federation_restricted_to.locations.append(
self.instance_map[instance]
)

def _should_this_worker_perform_duty(
self,
config: Dict[str, Any],
Expand Down
10 changes: 5 additions & 5 deletions synapse/http/federation/matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
@implementer(IAgent)
class MatrixFederationAgent:
"""An Agent-like thing which provides a `request` method which correctly
handles resolving matrix server names when using matrix://. Handles standard
handles resolving matrix server names when using matrix-federation://. Handles standard
https URIs as normal.

Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
Expand Down Expand Up @@ -167,14 +167,14 @@ def request(
# There must be a valid hostname.
assert parsed_uri.hostname

# If this is a matrix:// URI check if the server has delegated matrix
# If this is a matrix-federation:// URI check if the server has delegated matrix
# traffic using well-known delegation.
#
# We have to do this here and not in the endpoint as we need to rewrite
# the host header with the delegated server name.
delegated_server = None
if (
parsed_uri.scheme == b"matrix"
parsed_uri.scheme == b"matrix-federation"
and not _is_ip_literal(parsed_uri.hostname)
and not parsed_uri.port
):
Expand Down Expand Up @@ -250,7 +250,7 @@ def endpointForURI(self, parsed_uri: URI) -> "MatrixHostnameEndpoint":

@implementer(IStreamClientEndpoint)
class MatrixHostnameEndpoint:
"""An endpoint that resolves matrix:// URLs using Matrix server name
"""An endpoint that resolves matrix-federation:// URLs using Matrix server name
resolution (i.e. via SRV). Does not check for well-known delegation.

Args:
Expand Down Expand Up @@ -379,7 +379,7 @@ async def _resolve_server(self) -> List[Server]:
connect to.
"""

if self._parsed_uri.scheme != b"matrix":
if self._parsed_uri.scheme != b"matrix-federation":
return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)]

# Note: We don't do well-known lookup as that needs to have happened
Expand Down
37 changes: 29 additions & 8 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from twisted.internet.task import Cooperator
from twisted.web.client import ResponseFailed
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer, IResponse
from twisted.web.iweb import IAgent, IBodyProducer, IResponse

import synapse.metrics
import synapse.util.retryutils
Expand All @@ -72,6 +72,7 @@
read_body_with_max_size,
)
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
Expand Down Expand Up @@ -172,7 +173,14 @@ def __attrs_post_init__(self) -> None:

# The object is frozen so we can pre-compute this.
uri = urllib.parse.urlunparse(
(b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
(
b"matrix-federation",
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
destination_bytes,
path_bytes,
None,
query_bytes,
b"",
)
)
object.__setattr__(self, "uri", uri)

Expand Down Expand Up @@ -386,13 +394,26 @@ def __init__(
if hs.config.server.user_agent_suffix:
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)

federation_agent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
hs.config.server.federation_ip_range_blocklist,
outbound_federation_restricted_to = (
hs.config.worker.outbound_federation_restricted_to
)
if hs.get_instance_name() in outbound_federation_restricted_to:
logger.info("asdf")
federation_agent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
hs.config.server.federation_ip_range_blocklist,
)
else:
federation_proxies = outbound_federation_restricted_to.locations
federation_agent: IAgent = ProxyAgent(
self.reactor,
self.reactor,
tls_client_options_factory,
federation_proxies=federation_proxies,
)

# Use a BlocklistingAgentWrapper to prevent circumventing the IP
# blocking via IP literals in server names
Expand Down
75 changes: 73 additions & 2 deletions synapse/http/proxyagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
import re
from typing import Any, Dict, Optional, Tuple
from typing import Any, Collection, Dict, List, Optional, Sequence, Tuple
from urllib.parse import urlparse
from urllib.request import ( # type: ignore[attr-defined]
getproxies_environment,
Expand All @@ -24,7 +25,12 @@

from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet.interfaces import IReactorCore, IStreamClientEndpoint
from twisted.internet.interfaces import (
IProtocol,
IProtocolFactory,
IReactorCore,
IStreamClientEndpoint,
)
from twisted.python.failure import Failure
from twisted.web.client import (
URI,
Expand All @@ -36,8 +42,10 @@
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse

from synapse.config.workers import InstanceLocationConfig
from synapse.http import redact_uri
from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials
from synapse.logging.context import run_in_background

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -89,6 +97,7 @@ def __init__(
bindAddress: Optional[bytes] = None,
pool: Optional[HTTPConnectionPool] = None,
use_proxy: bool = False,
federation_proxies: Collection[InstanceLocationConfig] = (),
):
contextFactory = contextFactory or BrowserLikePolicyForHTTPS()

Expand Down Expand Up @@ -127,6 +136,27 @@ def __init__(
self._policy_for_https = contextFactory
self._reactor = reactor

self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None
if federation_proxies:
endpoints = []
for federation_proxy in federation_proxies:
endpoint = HostnameEndpoint(
self.proxy_reactor,
federation_proxy.host,
federation_proxy.port,
)

if federation_proxy.tls:
tls_connection_creator = self._policy_for_https.creatorForNetloc(
federation_proxy.host,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@realtyem mentioned that we probably need to copy what https://github.com/matrix-org/synapse/pull/15746/files did for TLS to work properly. host comes in as a str and creatorForNetloc(...) is expecting bytes.

Suggested change
federation_proxy.host,
federation_proxy.host.encode("utf-8"),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #15886

federation_proxy.port,
)
endpoint = wrapClientTLS(tls_connection_creator, endpoint)

endpoints.append(endpoint)

self._federation_proxy_endpoint = _ProxyEndpoints(endpoints)

def request(
self,
method: bytes,
Expand Down Expand Up @@ -214,6 +244,14 @@ def request(
parsed_uri.port,
self.https_proxy_creds,
)
elif (
parsed_uri.scheme == b"matrix-federation"
and self._federation_proxy_endpoint
):
# Cache *all* connections under the same key, since we are only
# connecting to a single destination, the proxy:
endpoint = self._federation_proxy_endpoint
request_path = uri
else:
# not using a proxy
endpoint = HostnameEndpoint(
Expand All @@ -233,6 +271,11 @@ def request(
endpoint = wrapClientTLS(tls_connection_creator, endpoint)
elif parsed_uri.scheme == b"http":
pass
elif (
parsed_uri.scheme == b"matrix-federation"
and self._federation_proxy_endpoint
):
pass
else:
return defer.fail(
Failure(
Expand Down Expand Up @@ -337,3 +380,31 @@ def parse_proxy(
credentials = ProxyCredentials(b"".join([url.username, b":", url.password]))

return url.scheme, url.hostname, url.port or default_port, credentials


@implementer(IStreamClientEndpoint)
class _ProxyEndpoints:
"""An endpoint that randomly iterates through a given list of endpoints at
each connection attempt.
"""

def __init__(self, endpoints: Sequence[IStreamClientEndpoint]) -> None:
assert endpoints
self._endpoints = endpoints

def connect(
self, protocol_factory: IProtocolFactory
) -> "defer.Deferred[IProtocol]":
"""Implements IStreamClientEndpoint interface"""

return run_in_background(self._do_connect, protocol_factory)

async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
failures: List[Failure] = []
for endpoint in random.sample(self._endpoints, k=len(self._endpoints)):
try:
return await endpoint.connect(protocol_factory)
except Exception:
failures.append(Failure())

failures.pop().raiseException()
4 changes: 2 additions & 2 deletions tests/federation/test_federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_get_room_state(self) -> None:
# check the right call got made to the agent
self._mock_agent.request.assert_called_once_with(
b"GET",
b"matrix://yet.another.server/_matrix/federation/v1/state/%21room_id?event_id=event_id",
b"matrix-federation://yet.another.server/_matrix/federation/v1/state/%21room_id?event_id=event_id",
headers=mock.ANY,
bodyProducer=None,
)
Expand Down Expand Up @@ -232,7 +232,7 @@ def _get_pdu_once(self) -> EventBase:
# check the right call got made to the agent
self._mock_agent.request.assert_called_once_with(
b"GET",
b"matrix://yet.another.server/_matrix/federation/v1/event/event_id",
b"matrix-federation://yet.another.server/_matrix/federation/v1/event/event_id",
headers=mock.ANY,
bodyProducer=None,
)
Expand Down
Loading