Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Improve logging and opentracing for to-device message handling (#14598)
Browse files Browse the repository at this point in the history
A batch of changes intended to make it easier to trace to-device messages through the system.

The intention here is that a client can set a property org.matrix.msgid in any to-device message it sends. That ID is then included in any tracing or logging related to the message. (Suggestions as to where this field should be documented welcome. I'm not enthusiastic about speccing it - it's very much an optional extra to help with debugging.)

I've also generally improved the data we send to opentracing for these messages.
  • Loading branch information
richvdh authored Dec 6, 2022
1 parent cee9445 commit cb59e08
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 46 deletions.
1 change: 1 addition & 0 deletions changelog.d/14598.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve opentracing and logging for to-device message handling.
3 changes: 3 additions & 0 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ class EventContentFields:
# The authorising user for joining a restricted room.
AUTHORISING_USER: Final = "join_authorised_via_users_server"

# an unspecced field added to to-device messages to identify them uniquely-ish
TO_DEVICE_MSGID: Final = "org.matrix.msgid"


class RoomTypes:
"""Understood values of the room_type field of m.room.create events."""
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]
if not message_id:
continue

set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)

edus = [
Edu(
Expand Down
3 changes: 0 additions & 3 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,6 @@ async def _get_to_device_messages(
device_id,
), messages in recipient_device_to_messages.items():
for message_json in messages:
# Remove 'message_id' from the to-device message, as it's an internal ID
message_json.pop("message_id", None)

message_payload.append(
{
"to_user_id": user_id,
Expand Down
36 changes: 22 additions & 14 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, Any, Dict

from synapse.api.constants import EduTypes, ToDeviceEventTypes
from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
Expand Down Expand Up @@ -216,14 +216,24 @@ async def send_device_message(
"""
sender_user_id = requester.user.to_string()

message_id = random_string(16)
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)

log_kv({"number_of_to_device_messages": len(messages)})
set_tag("sender", sender_user_id)
set_tag(SynapseTags.TO_DEVICE_TYPE, message_type)
set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id)
local_messages = {}
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items():
# add an opentracing log entry for each message
for device_id, message_content in by_device.items():
log_kv(
{
"event": "send_to_device_message",
"user_id": user_id,
"device_id": device_id,
EventContentFields.TO_DEVICE_MSGID: message_content.get(
EventContentFields.TO_DEVICE_MSGID
),
}
)

# Ratelimit local cross-user key requests by the sending device.
if (
message_type == ToDeviceEventTypes.RoomKeyRequest
Expand All @@ -233,6 +243,7 @@ async def send_device_message(
requester, (sender_user_id, requester.device_id)
)
if not allowed:
log_kv({"message": f"dropping key requests to {user_id}"})
logger.info(
"Dropping room_key_request from %s to %s due to rate limit",
sender_user_id,
Expand All @@ -247,18 +258,11 @@ async def send_device_message(
"content": message_content,
"type": message_type,
"sender": sender_user_id,
"message_id": message_id,
}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
log_kv(
{
"user_id": user_id,
"device_id": list(messages_by_device),
}
)
else:
destination = get_domain_from_id(user_id)
remote_messages.setdefault(destination, {})[user_id] = by_device
Expand All @@ -267,7 +271,11 @@ async def send_device_message(

remote_edu_contents = {}
for destination, messages in remote_messages.items():
log_kv({"destination": destination})
# The EDU contains a "message_id" property which is used for
# idempotence. Make up a random one.
message_id = random_string(16)
log_kv({"destination": destination, "message_id": message_id})

remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
Expand Down
26 changes: 19 additions & 7 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,20 @@
import attr
from prometheus_client import Counter

from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.logging.opentracing import (
SynapseTags,
log_kv,
set_tag,
start_active_span,
trace,
)
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
Expand Down Expand Up @@ -1586,6 +1592,7 @@ async def _generate_sync_entry_for_device_list(
else:
return DeviceListUpdates()

@trace
async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder"
) -> None:
Expand All @@ -1605,11 +1612,16 @@ async def _generate_sync_entry_for_to_device(
)

for message in messages:
# We pop here as we shouldn't be sending the message ID down
# `/sync`
message_id = message.pop("message_id", None)
if message_id:
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
log_kv(
{
"event": "to_device_message",
"sender": message["sender"],
"type": message["type"],
EventContentFields.TO_DEVICE_MSGID: message["content"].get(
EventContentFields.TO_DEVICE_MSGID
),
}
)

logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
Expand Down
11 changes: 9 additions & 2 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,15 @@ def report_span(self, span: "opentracing.Span") -> None:


class SynapseTags:
# The message ID of any to_device message processed
TO_DEVICE_MESSAGE_ID = "to_device.message_id"
# The message ID of any to_device EDU processed
TO_DEVICE_EDU_ID = "to_device.edu_id"

# Details about to-device messages
TO_DEVICE_TYPE = "to_device.type"
TO_DEVICE_SENDER = "to_device.sender"
TO_DEVICE_RECIPIENT = "to_device.recipient"
TO_DEVICE_RECIPIENT_DEVICE = "to_device.recipient_device"
TO_DEVICE_MSGID = "to_device.msgid" # client-generated ID

# Whether the sync response has new data to be returned to the client.
SYNC_RESULT = "sync.new_data"
Expand Down
1 change: 0 additions & 1 deletion synapse/rest/client/sendtodevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def __init__(self, hs: "HomeServer"):
def on_PUT(
self, request: SynapseRequest, message_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("message_type", message_type)
set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
Expand Down
92 changes: 75 additions & 17 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@
cast,
)

from synapse.api.constants import EventContentFields
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.logging.opentracing import (
SynapseTags,
log_kv,
set_tag,
start_active_span,
trace,
)
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
Expand Down Expand Up @@ -397,6 +404,17 @@ def get_device_messages_txn(
(recipient_user_id, recipient_device_id), []
).append(message_dict)

# start a new span for each message, so that we can tag each separately
with start_active_span("get_to_device_message"):
set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"])
set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID),
)

if limit is not None and rowcount == limit:
# We ended up bumping up against the message limit. There may be more messages
# to retrieve. Return what we have, as well as the last stream position that
Expand Down Expand Up @@ -678,12 +696,35 @@ def add_messages_txn(
],
)

if remote_messages_by_destination:
issue9533_logger.debug(
"Queued outgoing to-device messages with stream_id %i for %s",
stream_id,
list(remote_messages_by_destination.keys()),
)
for destination, edu in remote_messages_by_destination.items():
if issue9533_logger.isEnabledFor(logging.DEBUG):
issue9533_logger.debug(
"Queued outgoing to-device messages with "
"stream_id %i, EDU message_id %s, type %s for %s: %s",
stream_id,
edu["message_id"],
edu["type"],
destination,
[
f"{user_id}/{device_id} (msgid "
f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})"
for (user_id, messages_by_device) in edu["messages"].items()
for (device_id, msg) in messages_by_device.items()
],
)

for (user_id, messages_by_device) in edu["messages"].items():
for (device_id, msg) in messages_by_device.items():
with start_active_span("store_outgoing_to_device_message"):
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"])
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"])
set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
msg.get(EventContentFields.TO_DEVICE_MSGID),
)

async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
Expand Down Expand Up @@ -801,7 +842,19 @@ def _add_messages_to_local_device_inbox_txn(
# Only insert into the local inbox if the device exists on
# this server
device_id = row["device_id"]
message_json = json_encoder.encode(messages_by_device[device_id])

with start_active_span("serialise_to_device_message"):
msg = messages_by_device[device_id]
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
)
message_json = json_encoder.encode(msg)

messages_json_for_user[device_id] = message_json

if messages_json_for_user:
Expand All @@ -821,15 +874,20 @@ def _add_messages_to_local_device_inbox_txn(
],
)

issue9533_logger.debug(
"Stored to-device messages with stream_id %i for %s",
stream_id,
[
(user_id, device_id)
for (user_id, messages_by_device) in local_by_user_then_device.items()
for device_id in messages_by_device.keys()
],
)
if issue9533_logger.isEnabledFor(logging.DEBUG):
issue9533_logger.debug(
"Stored to-device messages with stream_id %i: %s",
stream_id,
[
f"{user_id}/{device_id} (msgid "
f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
for (
user_id,
messages_by_device,
) in messages_by_user_then_device.items()
for (device_id, msg) in messages_by_device.items()
],
)


class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
Expand Down
7 changes: 6 additions & 1 deletion tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,12 @@ def test_application_services_receive_bursts_of_to_device(self):
fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)]
messages = {
self.exclusive_as_user: {
device_id: to_device_message_content for device_id in fake_device_ids
device_id: {
"type": "test_to_device_message",
"sender": "@some:sender",
"content": to_device_message_content,
}
for device_id in fake_device_ids
}
}

Expand Down

0 comments on commit cb59e08

Please sign in to comment.