Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move additional tasks to the background worker, part 3 #8489

Merged
merged 8 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8489.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow running background tasks in a separate worker process.
14 changes: 3 additions & 11 deletions synapse/app/phone_stats_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

from prometheus_client import Gauge

from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.metrics.background_process_metrics import wrap_as_background_process

logger = logging.getLogger("synapse.app.homeserver")

Expand Down Expand Up @@ -152,13 +149,8 @@ def performance_stats_init():
clock.looping_call(hs.get_datastore().generate_user_daily_visits, 5 * 60 * 1000)

# monthly active user limiting functionality
def reap_monthly_active_users():
return run_as_background_process(
"reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
)

clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()
clock.looping_call(hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60)
hs.get_datastore().reap_monthly_active_users()

@wrap_as_background_process("generate_monthly_active_users")
async def generate_monthly_active_users():
Expand Down
15 changes: 7 additions & 8 deletions synapse/storage/databases/main/censor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import TYPE_CHECKING

from synapse.events.utils import prune_event_dict
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
Expand All @@ -35,14 +35,13 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
super().__init__(database, db_conn, hs)

def _censor_redactions():
return run_as_background_process(
"_censor_redactions", self._censor_redactions
)

if self.hs.config.redaction_retention_period is not None:
hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
if (
hs.config.run_background_tasks
and self.hs.config.redaction_retention_period is not None
):
hs.get_clock().looping_call(self._censor_redactions, 5 * 60 * 1000)

@wrap_as_background_process("_censor_redactions")
async def _censor_redactions(self):
"""Censors all redactions older than the configured period that haven't
been censored yet.
Expand Down
196 changes: 101 additions & 95 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
trace,
whitelisted_homeserver,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
Expand All @@ -48,6 +48,14 @@


class DeviceWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

if hs.config.run_background_tasks:
self._clock.looping_call(
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
)

async def get_device(self, user_id: str, device_id: str) -> Dict[str, Any]:
"""Retrieve a device. Only returns devices that are not marked as
hidden.
Expand Down Expand Up @@ -772,6 +780,98 @@ async def remove_dehydrated_device(self, user_id: str, device_id: str) -> bool:
)
return count >= 1

@wrap_as_background_process("prune_old_outbound_device_pokes")
async def _prune_old_outbound_device_pokes(
self, prune_age: int = 24 * 60 * 60 * 1000
) -> None:
"""Delete old entries out of the device_lists_outbound_pokes to ensure
that we don't fill up due to dead servers.

Normally, we try to send device updates as a delta since a previous known point:
this is done by setting the prev_id in the m.device_list_update EDU. However,
for that to work, we have to have a complete record of each change to
each device, which can add up to quite a lot of data.

An alternative mechanism is that, if the remote server sees that it has missed
an entry in the stream_id sequence for a given user, it will request a full
list of that user's devices. Hence, we can reduce the amount of data we have to
store (and transmit in some future transaction), by clearing almost everything
for a given destination out of the database, and having the remote server
resync.

All we need to do is make sure we keep at least one row for each
(user, destination) pair, to remind us to send a m.device_list_update EDU for
that user when the destination comes back. It doesn't matter which device
we keep.
"""
yesterday = self._clock.time_msec() - prune_age

def _prune_txn(txn):
# look for (user, destination) pairs which have an update older than
# the cutoff.
#
# For each pair, we also need to know the most recent stream_id, and
# an arbitrary device_id at that stream_id.
select_sql = """
SELECT
dlop1.destination,
dlop1.user_id,
MAX(dlop1.stream_id) AS stream_id,
(SELECT MIN(dlop2.device_id) AS device_id FROM
device_lists_outbound_pokes dlop2
WHERE dlop2.destination = dlop1.destination AND
dlop2.user_id=dlop1.user_id AND
dlop2.stream_id=MAX(dlop1.stream_id)
)
FROM device_lists_outbound_pokes dlop1
GROUP BY destination, user_id
HAVING min(ts) < ? AND count(*) > 1
"""

txn.execute(select_sql, (yesterday,))
rows = txn.fetchall()

if not rows:
return

logger.info(
"Pruning old outbound device list updates for %i users/destinations: %s",
len(rows),
shortstr((row[0], row[1]) for row in rows),
)

# we want to keep the update with the highest stream_id for each user.
#
# there might be more than one update (with different device_ids) with the
# same stream_id, so we also delete all but one rows with the max stream id.
delete_sql = """
DELETE FROM device_lists_outbound_pokes
WHERE destination = ? AND user_id = ? AND (
stream_id < ? OR
(stream_id = ? AND device_id != ?)
)
"""
count = 0
for (destination, user_id, stream_id, device_id) in rows:
txn.execute(
delete_sql, (destination, user_id, stream_id, stream_id, device_id)
)
count += txn.rowcount

# Since we've deleted unsent deltas, we need to remove the entry
# of last successful sent so that the prev_ids are correctly set.
sql = """
DELETE FROM device_lists_outbound_last_success
WHERE destination = ? AND user_id = ?
"""
txn.executemany(sql, ((row[0], row[1]) for row in rows))

logger.info("Pruned %d device list outbound pokes", count)

await self.db_pool.runInteraction(
"_prune_old_outbound_device_pokes", _prune_txn,
)


class DeviceBackgroundUpdateStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
Expand Down Expand Up @@ -908,8 +1008,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
name="device_id_exists", keylen=2, max_entries=10000
)

self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)

async def store_device(
self, user_id: str, device_id: str, initial_device_display_name: Optional[str]
) -> bool:
Expand Down Expand Up @@ -1267,95 +1365,3 @@ def _add_device_outbound_poke_to_stream_txn(
for device_id in device_ids
],
)

def _prune_old_outbound_device_pokes(self, prune_age: int = 24 * 60 * 60 * 1000):
"""Delete old entries out of the device_lists_outbound_pokes to ensure
that we don't fill up due to dead servers.

Normally, we try to send device updates as a delta since a previous known point:
this is done by setting the prev_id in the m.device_list_update EDU. However,
for that to work, we have to have a complete record of each change to
each device, which can add up to quite a lot of data.

An alternative mechanism is that, if the remote server sees that it has missed
an entry in the stream_id sequence for a given user, it will request a full
list of that user's devices. Hence, we can reduce the amount of data we have to
store (and transmit in some future transaction), by clearing almost everything
for a given destination out of the database, and having the remote server
resync.

All we need to do is make sure we keep at least one row for each
(user, destination) pair, to remind us to send a m.device_list_update EDU for
that user when the destination comes back. It doesn't matter which device
we keep.
"""
yesterday = self._clock.time_msec() - prune_age

def _prune_txn(txn):
# look for (user, destination) pairs which have an update older than
# the cutoff.
#
# For each pair, we also need to know the most recent stream_id, and
# an arbitrary device_id at that stream_id.
select_sql = """
SELECT
dlop1.destination,
dlop1.user_id,
MAX(dlop1.stream_id) AS stream_id,
(SELECT MIN(dlop2.device_id) AS device_id FROM
device_lists_outbound_pokes dlop2
WHERE dlop2.destination = dlop1.destination AND
dlop2.user_id=dlop1.user_id AND
dlop2.stream_id=MAX(dlop1.stream_id)
)
FROM device_lists_outbound_pokes dlop1
GROUP BY destination, user_id
HAVING min(ts) < ? AND count(*) > 1
"""

txn.execute(select_sql, (yesterday,))
rows = txn.fetchall()

if not rows:
return

logger.info(
"Pruning old outbound device list updates for %i users/destinations: %s",
len(rows),
shortstr((row[0], row[1]) for row in rows),
)

# we want to keep the update with the highest stream_id for each user.
#
# there might be more than one update (with different device_ids) with the
# same stream_id, so we also delete all but one rows with the max stream id.
delete_sql = """
DELETE FROM device_lists_outbound_pokes
WHERE destination = ? AND user_id = ? AND (
stream_id < ? OR
(stream_id = ? AND device_id != ?)
)
"""
count = 0
for (destination, user_id, stream_id, device_id) in rows:
txn.execute(
delete_sql, (destination, user_id, stream_id, stream_id, device_id)
)
count += txn.rowcount

# Since we've deleted unsent deltas, we need to remove the entry
# of last successful sent so that the prev_ids are correctly set.
sql = """
DELETE FROM device_lists_outbound_last_success
WHERE destination = ? AND user_id = ?
"""
txn.executemany(sql, ((row[0], row[1]) for row in rows))

logger.info("Pruned %d device list outbound pokes", count)

return run_as_background_process(
"prune_old_outbound_device_pokes",
self.db_pool.runInteraction,
"_prune_old_outbound_device_pokes",
_prune_txn,
)
60 changes: 31 additions & 29 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from synapse.api.errors import StoreError
from synapse.events import EventBase
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.events_worker import EventsWorkerStore
Expand All @@ -32,6 +32,14 @@


class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

if hs.config.run_background_tasks:
hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)

async def get_auth_chain(
self, event_ids: Collection[str], include_given: bool = False
) -> List[EventBase]:
Expand Down Expand Up @@ -586,6 +594,28 @@ async def get_successor_events(self, event_ids: Iterable[str]) -> List[str]:

return [row["event_id"] for row in rows]

@wrap_as_background_process("delete_old_forward_extrem_cache")
async def _delete_old_forward_extrem_cache(self) -> None:
def _delete_old_forward_extrem_cache_txn(txn):
# Delete entries older than a month, while making sure we don't delete
# the only entries for a room.
sql = """
DELETE FROM stream_ordering_to_exterm
WHERE
room_id IN (
SELECT room_id
FROM stream_ordering_to_exterm
WHERE stream_ordering > ?
) AND stream_ordering < ?
"""
txn.execute(
sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago)
)

await self.db_pool.runInteraction(
"_delete_old_forward_extrem_cache", _delete_old_forward_extrem_cache_txn,
)


class EventFederationStore(EventFederationWorkerStore):
""" Responsible for storing and serving up the various graphs associated
Expand All @@ -606,34 +636,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
)

hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)

def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
# Delete entries older than a month, while making sure we don't delete
# the only entries for a room.
sql = """
DELETE FROM stream_ordering_to_exterm
WHERE
room_id IN (
SELECT room_id
FROM stream_ordering_to_exterm
WHERE stream_ordering > ?
) AND stream_ordering < ?
"""
txn.execute(
sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago)
)

return run_as_background_process(
"delete_old_forward_extrem_cache",
self.db_pool.runInteraction,
"_delete_old_forward_extrem_cache",
_delete_old_forward_extrem_cache_txn,
)

async def clean_room_for_join(self, room_id):
return await self.db_pool.runInteraction(
"clean_room_for_join", self._clean_room_for_join_txn, room_id
Expand Down
Loading