Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh committed Jul 28, 2021
1 parent 861e031 commit 900a7ef
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 0 deletions.
6 changes: 6 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ async def process_pdus_for_room(room_id: str):
for pdu in pdus_by_room[room_id]:
pdu_results[pdu.event_id] = await process_pdu(pdu)

logger.debug("Processed all PDU for %s", room_id)

async def process_pdu(pdu: EventBase) -> JsonDict:
event_id = pdu.event_id
with nested_logging_context(event_id):
Expand All @@ -431,6 +433,8 @@ async def process_pdu(pdu: EventBase) -> JsonDict:
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
)

logger.debug("Processed all PDUs in txn")

if newest_pdu_ts and origin in self._federation_metrics_domains:
last_pdu_ts_metric.labels(server_name=origin).set(newest_pdu_ts / 1000)

Expand All @@ -456,6 +460,8 @@ async def _process_edu(edu_dict):
TRANSACTION_CONCURRENCY_LIMIT,
)

logger.debug("Processed all EDUs in txn")

async def on_room_state_request(
self, origin: str, room_id: str, event_id: Optional[str]
) -> Tuple[int, Dict[str, Any]]:
Expand Down
9 changes: 9 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2264,9 +2264,11 @@ async def _run_push_actions_and_persist_event(
event, context
)

logger.debug("Persisting %s", event.event_id)
await self.persist_events_and_notify(
event.room_id, [(event, context)], backfilled=backfilled
)
logger.debug("Persisted %s", event.event_id)
except Exception:
run_in_background(
self.store.remove_push_actions_from_staging, event.event_id
Expand Down Expand Up @@ -2512,7 +2514,9 @@ async def _check_for_soft_fail(
e for k, e in current_state_ids.items() if k in auth_types
]

logger.debug("Fetching auth event list %s", current_state_ids_list)
auth_events_map = await self.store.get_events(current_state_ids_list)
logger.debug("Got auth events %s", auth_events_map.values())
current_auth_events = {
(e.type, e.state_key): e for e in auth_events_map.values()
}
Expand All @@ -2533,6 +2537,9 @@ async def _check_for_soft_fail(
)
soft_failed_event_counter.inc()
event.internal_metadata.soft_failed = True
except Exception:
logger.exception("Got exeption during auth check")
raise

async def on_get_missing_events(
self,
Expand Down Expand Up @@ -2655,6 +2662,8 @@ async def _check_event_auth(
event, context
)

logger.debug("Completed _check_event_auth")

return context

async def _update_auth_events_and_context_for_auth(
Expand Down
11 changes: 11 additions & 0 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ async def add_to_queue(
# if the last item in the queue has the same `backfilled` setting,
# we can just add these new events to that item.
if queue and queue[-1].backfilled == backfilled:
logger.debug("Adding persistence batch to the existing item")
end_item = queue[-1]
else:
# need to make a new queue item
Expand Down Expand Up @@ -217,14 +218,21 @@ def _handle_queue(self, room_id):
invoked.
"""
if room_id in self._currently_persisting_rooms:
logger.debug("%s is already being persisted; waiting", room_id)
return

self._currently_persisting_rooms.add(room_id)

async def handle_queue_loop():
logger.debug("Starting persist loop for %s", room_id)
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
logger.debug(
"Persisting events in %s: %s",
room_id,
[e.event_id for e, c in item.events_and_contexts],
)
try:
with opentracing.start_active_span_follows_from(
"persist_event_batch",
Expand All @@ -238,16 +246,19 @@ async def handle_queue_loop():
item.events_and_contexts, item.backfilled
)
except Exception:
logger.exception("Exception during event persistence")
with PreserveLoggingContext():
item.deferred.errback()
else:
with PreserveLoggingContext():
item.deferred.callback(ret)
logger.debug("Completed event batch in %s", room_id)
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
logger.debug("Ending persist loop for %s", room_id)

# set handle_queue_loop off in the background
run_as_background_process("persist_events", handle_queue_loop)
Expand Down

0 comments on commit 900a7ef

Please sign in to comment.