Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Move update_client_ip background job from the main process to the b…
Browse files Browse the repository at this point in the history
…ackground worker. (#12251)
  • Loading branch information
reivilibre authored Apr 1, 2022
1 parent 319a805 commit f871222
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 153 deletions.
1 change: 1 addition & 0 deletions changelog.d/12251.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Offload the `update_client_ip` background job from the main process to the background worker, when using Redis-based replication.
2 changes: 0 additions & 2 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
Expand Down Expand Up @@ -61,7 +60,6 @@ class AdminCmdSlavedStore(
SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
SlavedClientIpStore,
BaseSlavedStore,
RoomWorkerStore,
):
Expand Down
2 changes: 0 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore
Expand Down Expand Up @@ -247,7 +246,6 @@ class GenericWorkerSlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedProfileStore,
SlavedClientIpStore,
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
Expand Down
59 changes: 0 additions & 59 deletions synapse/replication/slave/storage/client_ips.py

This file was deleted.

8 changes: 7 additions & 1 deletion synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def __init__(
access_token: str,
ip: str,
user_agent: str,
device_id: str,
device_id: Optional[str],
last_seen: int,
):
self.user_id = user_id
Expand Down Expand Up @@ -389,6 +389,12 @@ def to_line(self) -> str:
)
)

def __repr__(self) -> str:
return (
f"UserIpCommand({self.user_id!r}, .., {self.ip!r}, "
f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})"
)


class RemoteServerUpCommand(_SimpleCommand):
"""Sent when a worker has detected that a remote server is no longer
Expand Down
48 changes: 35 additions & 13 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ def __init__(self, hs: "HomeServer"):
if self._is_master:
self._server_notices_sender = hs.get_server_notices_sender()

if hs.config.redis.redis_enabled:
# If we're using Redis, it's the background worker that should
# receive USER_IP commands and store the relevant client IPs.
self._should_insert_client_ips = hs.config.worker.run_background_tasks
else:
# If we're NOT using Redis, this must be handled by the master
self._should_insert_client_ips = hs.get_instance_name() == "master"

def _add_command_to_stream_queue(
self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
) -> None:
Expand Down Expand Up @@ -401,23 +409,37 @@ def on_USER_IP(
) -> Optional[Awaitable[None]]:
user_ip_cache_counter.inc()

if self._is_master:
if self._is_master or self._should_insert_client_ips:
# We make a point of only returning an awaitable if there's actually
# something to do; on_USER_IP is not an async function, but
# _handle_user_ip is.
# If on_USER_IP returns an awaitable, it gets scheduled as a
# background process (see `BaseReplicationStreamProtocol.handle_command`).
return self._handle_user_ip(cmd)
else:
# Returning None when this process definitely has nothing to do
# reduces the overhead of handling the USER_IP command, which is
# currently broadcast to all workers regardless of utility.
return None

async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
await self._store.insert_client_ip(
cmd.user_id,
cmd.access_token,
cmd.ip,
cmd.user_agent,
cmd.device_id,
cmd.last_seen,
)

assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)
"""
Handles a User IP, branching depending on whether we are the main process
and/or the background worker.
"""
if self._is_master:
assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)

if self._should_insert_client_ips:
await self._store.insert_client_ip(
cmd.user_id,
cmd.access_token,
cmd.ip,
cmd.user_agent,
cmd.device_id,
cmd.last_seen,
)

def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
if cmd.instance_name == self._instance_name:
Expand Down Expand Up @@ -698,7 +720,7 @@ def send_user_ip(
access_token: str,
ip: str,
user_agent: str,
device_id: str,
device_id: Optional[str],
last_seen: int,
) -> None:
"""Tell the master that the user made a request."""
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
from .cache import CacheInvalidationWorkerStore
from .censor_events import CensorEventsStore
from .client_ips import ClientIpStore
from .client_ips import ClientIpWorkerStore
from .deviceinbox import DeviceInboxStore
from .devices import DeviceStore
from .directory import DirectoryStore
Expand All @@ -49,7 +49,7 @@
from .lock import LockStore
from .media_repository import MediaRepositoryStore
from .metrics import ServerMetricsStore
from .monthly_active_users import MonthlyActiveUsersStore
from .monthly_active_users import MonthlyActiveUsersWorkerStore
from .openid import OpenIdStore
from .presence import PresenceStore
from .profile import ProfileStore
Expand Down Expand Up @@ -112,13 +112,13 @@ class DataStore(
AccountDataStore,
EventPushActionsStore,
OpenIdStore,
ClientIpStore,
ClientIpWorkerStore,
DeviceStore,
DeviceInboxStore,
UserDirectoryStore,
GroupServerStore,
UserErasureStore,
MonthlyActiveUsersStore,
MonthlyActiveUsersWorkerStore,
StatsStore,
RelationsStore,
CensorEventsStore,
Expand Down
Loading

0 comments on commit f871222

Please sign in to comment.