Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Implement MSC3984 to proxy /keys/query requests to appservices. (#15321)
Browse files Browse the repository at this point in the history
If enabled, for users which are exclusively owned by an application
service then the appservice will be queried for devices in addition
to any information stored in the Synapse database.
  • Loading branch information
clokep authored Mar 30, 2023
1 parent d9f6949 commit ae4acda
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 48 deletions.
2 changes: 1 addition & 1 deletion changelog.d/15314.feature
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Experimental support for passing One Time Key requests to application services ([MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983)).
Experimental support for passing One Time Key and device key requests to application services ([MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983) and [MSC3984](https://github.com/matrix-org/matrix-spec-proposals/pull/3984)).
1 change: 1 addition & 0 deletions changelog.d/15321.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Experimental support for passing One Time Key and device key requests to application services ([MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983) and [MSC3984](https://github.com/matrix-org/matrix-spec-proposals/pull/3984)).
54 changes: 50 additions & 4 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
from typing_extensions import TypeGuard

from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.appservice import (
ApplicationService,
TransactionOneTimeKeysCount,
TransactionUnusedFallbackKeys,
)
from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig, serialize_event
from synapse.http.client import SimpleHttpClient
from synapse.http.client import SimpleHttpClient, is_unknown_endpoint
from synapse.types import DeviceListUpdates, JsonDict, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache

Expand Down Expand Up @@ -393,7 +393,11 @@ async def claim_client_keys(
) -> Tuple[Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str]]]:
"""Claim one time keys from an application service.
Note that any error (including a timeout) is treated as the application
service having no information.
Args:
service: The application service to query.
query: An iterable of tuples of (user ID, device ID, algorithm).
Returns:
Expand Down Expand Up @@ -422,9 +426,9 @@ async def claim_client_keys(
body,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
except CodeMessageException as e:
except HttpResponseException as e:
# The appservice doesn't support this endpoint.
if e.code == 404 or e.code == 405:
if is_unknown_endpoint(e):
return {}, query
logger.warning("claim_keys to %s received %s", uri, e.code)
return {}, query
Expand All @@ -444,6 +448,48 @@ async def claim_client_keys(

return response, missing

async def query_keys(
self, service: "ApplicationService", query: Dict[str, List[str]]
) -> Dict[str, Dict[str, Dict[str, JsonDict]]]:
"""Query the application service for keys.
Note that any error (including a timeout) is treated as the application
service having no information.
Args:
service: The application service to query.
query: An iterable of tuples of (user ID, device ID, algorithm).
Returns:
A map of device_keys/master_keys/self_signing_keys/user_signing_keys:
device_keys is a map of user ID -> a map device ID -> device info.
"""
if service.url is None:
return {}

# This is required by the configuration.
assert service.hs_token is not None

uri = f"{service.url}/_matrix/app/unstable/org.matrix.msc3984/keys/query"
try:
response = await self.post_json_get_json(
uri,
query,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
except HttpResponseException as e:
# The appservice doesn't support this endpoint.
if is_unknown_endpoint(e):
return {}
logger.warning("query_keys to %s received %s", uri, e.code)
return {}
except Exception as ex:
logger.warning("query_keys to %s threw exception %s", uri, ex)
return {}

return response

def _serialize(
self, service: "ApplicationService", events: Iterable[EventBase]
) -> List[JsonDict]:
Expand Down
5 changes: 5 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"msc3983_appservice_otk_claims", False
)

# MSC3984: Proxying key queries to exclusive ASes.
self.msc3984_appservice_key_query: bool = experimental.get(
"msc3984_appservice_key_query", False
)

# MSC3706 (server-side support for partial state in /send_join responses)
# Synapse will always serve partial state responses to requests using the stable
# query parameter `omit_members`. If this flag is set, Synapse will also serve
Expand Down
48 changes: 6 additions & 42 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
event_from_pdu_json,
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.client import is_unknown_endpoint
from synapse.http.types import QueryParams
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
Expand Down Expand Up @@ -759,43 +760,6 @@ async def get_event_auth(

return signed_auth

def _is_unknown_endpoint(
self, e: HttpResponseException, synapse_error: Optional[SynapseError] = None
) -> bool:
"""
Returns true if the response was due to an endpoint being unimplemented.
Args:
e: The error response received from the remote server.
synapse_error: The above error converted to a SynapseError. This is
automatically generated if not provided.
"""
if synapse_error is None:
synapse_error = e.to_synapse_error()
# MSC3743 specifies that servers should return a 404 or 405 with an errcode
# of M_UNRECOGNIZED when they receive a request to an unknown endpoint or
# to an unknown method, respectively.
#
# Older versions of servers don't properly handle this. This needs to be
# rather specific as some endpoints truly do return 404 errors.
return (
# 404 is an unknown endpoint, 405 is a known endpoint, but unknown method.
(e.code == 404 or e.code == 405)
and (
# Older Dendrites returned a text or empty body.
# Older Conduit returned an empty body.
not e.response
or e.response == b"404 page not found"
# The proper response JSON with M_UNRECOGNIZED errcode.
or synapse_error.errcode == Codes.UNRECOGNIZED
)
) or (
# Older Synapses returned a 400 error.
e.code == 400
and synapse_error.errcode == Codes.UNRECOGNIZED
)

async def _try_destination_list(
self,
description: str,
Expand Down Expand Up @@ -887,7 +851,7 @@ async def _try_destination_list(
elif 400 <= e.code < 500 and synapse_error.errcode in failover_errcodes:
failover = True

elif failover_on_unknown_endpoint and self._is_unknown_endpoint(
elif failover_on_unknown_endpoint and is_unknown_endpoint(
e, synapse_error
):
failover = True
Expand Down Expand Up @@ -1223,7 +1187,7 @@ async def _do_send_join(
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
# and raise.
if not self._is_unknown_endpoint(e):
if not is_unknown_endpoint(e):
raise

logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
Expand Down Expand Up @@ -1297,7 +1261,7 @@ async def _do_send_invite(
# fallback to the v1 endpoint if the room uses old-style event IDs.
# Otherwise, consider it a legitimate error and raise.
err = e.to_synapse_error()
if self._is_unknown_endpoint(e, err):
if is_unknown_endpoint(e, err):
if room_version.event_format != EventFormatVersions.ROOM_V1_V2:
raise SynapseError(
400,
Expand Down Expand Up @@ -1358,7 +1322,7 @@ async def _do_send_leave(self, destination: str, pdu: EventBase) -> JsonDict:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
# and raise.
if not self._is_unknown_endpoint(e):
if not is_unknown_endpoint(e):
raise

logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")
Expand Down Expand Up @@ -1629,7 +1593,7 @@ async def send_request(
# If an error is received that is due to an unrecognised endpoint,
# fallback to the unstable endpoint. Otherwise, consider it a
# legitimate error and raise.
if not self._is_unknown_endpoint(e):
if not is_unknown_endpoint(e):
raise

logger.debug(
Expand Down
61 changes: 61 additions & 0 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Dict,
Iterable,
List,
Mapping,
Optional,
Tuple,
Union,
Expand Down Expand Up @@ -846,6 +847,10 @@ async def claim_e2e_one_time_keys(
]:
"""Claim one time keys from application services.
Users which are exclusively owned by an application service are sent a
key claim request to check if the application service provides keys
directly.
Args:
query: An iterable of tuples of (user ID, device ID, algorithm).
Expand Down Expand Up @@ -901,3 +906,59 @@ async def claim_e2e_one_time_keys(
missing.extend(result[1])

return claimed_keys, missing

async def query_keys(
self, query: Mapping[str, Optional[List[str]]]
) -> Dict[str, Dict[str, Dict[str, JsonDict]]]:
"""Query application services for device keys.
Users which are exclusively owned by an application service are queried
for keys to check if the application service provides keys directly.
Args:
query: map from user_id to a list of devices to query
Returns:
A map from user_id -> device_id -> device details
"""
services = self.store.get_app_services()

# Partition the users by appservice.
query_by_appservice: Dict[str, Dict[str, List[str]]] = {}
for user_id, device_ids in query.items():
if not self.store.get_if_app_services_interested_in_user(user_id):
continue

# Find the associated appservice.
for service in services:
if service.is_exclusive_user(user_id):
query_by_appservice.setdefault(service.id, {})[user_id] = (
device_ids or []
)
continue

# Query each service in parallel.
results = await make_deferred_yieldable(
defer.DeferredList(
[
run_in_background(
self.appservice_api.query_keys,
# We know this must be an app service.
self.store.get_app_service_by_id(service_id), # type: ignore[arg-type]
service_query,
)
for service_id, service_query in query_by_appservice.items()
],
consumeErrors=True,
)
)

# Patch together the results -- they are all independent (since they
# require exclusive control over the users). They get returned as a single
# dictionary.
key_queries: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for success, result in results:
if success:
key_queries.update(result)

return key_queries
16 changes: 16 additions & 0 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def __init__(self, hs: "HomeServer"):
self._query_appservices_for_otks = (
hs.config.experimental.msc3983_appservice_otk_claims
)
self._query_appservices_for_keys = (
hs.config.experimental.msc3984_appservice_key_query
)

@trace
@cancellable
Expand Down Expand Up @@ -497,6 +500,19 @@ async def query_local_devices(
local_query, include_displaynames
)

# Check if the application services have any additional results.
if self._query_appservices_for_keys:
# Query the appservices for any keys.
appservice_results = await self._appservice_handler.query_keys(query)

# Merge results, overriding with what the appservice returned.
for user_id, devices in appservice_results.get("device_keys", {}).items():
# Copy the appservice device info over the homeserver device info, but
# don't completely overwrite it.
results.setdefault(user_id, {}).update(devices)

# TODO Handle cross-signing keys.

# Build the result structure
for user_id, device_keys in results.items():
for device_id, device_info in device_keys.items():
Expand Down
38 changes: 38 additions & 0 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,3 +966,41 @@ def getContext(self) -> SSL.Context:

def creatorForNetloc(self, hostname: bytes, port: int) -> IOpenSSLContextFactory:
return self


def is_unknown_endpoint(
e: HttpResponseException, synapse_error: Optional[SynapseError] = None
) -> bool:
"""
Returns true if the response was due to an endpoint being unimplemented.
Args:
e: The error response received from the remote server.
synapse_error: The above error converted to a SynapseError. This is
automatically generated if not provided.
"""
if synapse_error is None:
synapse_error = e.to_synapse_error()
# MSC3743 specifies that servers should return a 404 or 405 with an errcode
# of M_UNRECOGNIZED when they receive a request to an unknown endpoint or
# to an unknown method, respectively.
#
# Older versions of servers don't properly handle this. This needs to be
# rather specific as some endpoints truly do return 404 errors.
return (
# 404 is an unknown endpoint, 405 is a known endpoint, but unknown method.
(e.code == 404 or e.code == 405)
and (
# Older Dendrites returned a text body or empty body.
# Older Conduit returned an empty body.
not e.response
or e.response == b"404 page not found"
# The proper response JSON with M_UNRECOGNIZED errcode.
or synapse_error.errcode == Codes.UNRECOGNIZED
)
) or (
# Older Synapses returned a 400 error.
e.code == 400
and synapse_error.errcode == Codes.UNRECOGNIZED
)
Loading

0 comments on commit ae4acda

Please sign in to comment.