Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't go into federation catch up mode so easily #9561

Merged
merged 9 commits into from
Mar 15, 2021
53 changes: 40 additions & 13 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,23 @@
# limitations under the License.
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
from collections import deque
from typing import (
TYPE_CHECKING,
Dict,
Hashable,
Iterable,
Iterator,
List,
Optional,
Tuple,
TypeVar,
cast,
)

import attr
from prometheus_client import Counter
from typing_extensions import Deque

from synapse.api.errors import (
FederationDeniedError,
Expand Down Expand Up @@ -113,8 +126,8 @@ def __init__(
# destination (we are the only updater so this is safe)
self._last_successful_stream_ordering = None # type: Optional[int]

# a list of pending PDUs
self._pending_pdus = [] # type: List[EventBase]
# a queue of pending PDUs
self._pending_pdus = deque() # type: Deque[EventBase]

# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
Expand Down Expand Up @@ -500,7 +513,7 @@ def _start_catching_up(self) -> None:
This throws away the PDU queue.
"""
self._catching_up = True
self._pending_pdus = []
self._pending_pdus = deque()


@attr.s(slots=True)
Expand All @@ -514,6 +527,7 @@ class _TransactionQueueManager:
_device_stream_id = attr.ib(type=Optional[int], default=None)
_device_list_id = attr.ib(type=Optional[int], default=None)
_last_stream_ordering = attr.ib(type=Optional[int], default=None)
_pdus = attr.ib(type=List[EventBase], factory=list)

async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
# We have to keep 2 free slots for presence and rr_edus
Expand Down Expand Up @@ -542,10 +556,8 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:

pending_edus = device_update_edus + to_device_edus
clokep marked this conversation as resolved.
Show resolved Hide resolved

pending_pdus = self.queue._pending_pdus

# We can only include at most 50 PDUs per transactions
pending_pdus, self.queue._pending_pdus = pending_pdus[:50], pending_pdus[50:]
# Get up to 50 PDUs from the queue
self._pdus = list(_popleft_upto_n_items_deque(self.queue._pending_pdus, 50))

pending_edus.extend(self.queue._get_rr_edus(force_flush=False))
pending_presence = self.queue._pending_presence
Expand Down Expand Up @@ -577,25 +589,29 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
_, val = self.queue._pending_edus_keyed.popitem()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied & pasted, but popitem is LIFO so it is theoretically possible you'll never reach the bottom of this queue, which is annoying. 😢

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah. In a separate PR i was going to make it so that we don't drop the EDUs if the transaction fails (in the same way as for PDUs), so it'll probably get magically fixed there?

pending_edus.append(val)

if not pending_pdus and not pending_edus:
if not self._pdus and not pending_edus:
return [], []

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self.queue._get_rr_edus(force_flush=True))

if pending_pdus:
self._last_stream_ordering = pending_pdus[
if self._pdus:
self._last_stream_ordering = self._pdus[
-1
].internal_metadata.stream_ordering
assert self._last_stream_ordering

return pending_pdus, pending_edus
return self._pdus, pending_edus

async def __aexit__(self, exc_type, exc, tb):
if exc_type is not None:
# Failed to send transaction, nothing to do.
# Failed to send transaction. Requeue events we failed to send this
# time round.
if self._pdus:
self.queue._pending_pdus.extendleft(self._pdus)
clokep marked this conversation as resolved.
Show resolved Hide resolved

return

# Succeeded to send the transaction so we record where we have sent up
Expand Down Expand Up @@ -623,3 +639,14 @@ async def __aexit__(self, exc_type, exc, tb):
await self.queue._store.set_destination_last_successful_stream_ordering(
self.queue._destination, self._last_stream_ordering
)


T = TypeVar("T")


def _popleft_upto_n_items_deque(d: Deque[T], n: int) -> Iterator[T]:
"Pops upto N items from the left of the deque."

while d and n:
yield d.popleft()
n -= 1