Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add basic read/write lock #15782

Merged
merged 9 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 142 additions & 14 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -68,12 +69,18 @@ def __init__(
self._reactor = hs.get_reactor()
self._instance_name = hs.get_instance_id()

# A map from `(lock_name, lock_key)` to the token of any locks that we
# think we currently hold.
self._live_tokens: WeakValueDictionary[
# A map from `(lock_name, lock_key)` to lock that we think we
# currently hold.
self._live_lock_tokens: WeakValueDictionary[
Tuple[str, str], Lock
] = WeakValueDictionary()

# A map from `(lock_name, lock_key, token)` to read/write lock that we
# think we currently hold.
self._live_read_write_lock_tokens: WeakValueDictionary[
Tuple[str, str, str], Lock
] = WeakValueDictionary()

# When we shut down we want to remove the locks. Technically this can
# lead to a race, as we may drop the lock while we are still processing.
# However, a) it should be a small window, b) the lock is best effort
Expand All @@ -91,11 +98,13 @@ async def _on_shutdown(self) -> None:
"""Called when the server is shutting down"""
logger.info("Dropping held locks due to shutdown")

# We need to take a copy of the tokens dict as dropping the locks will
# cause the dictionary to change.
locks = dict(self._live_tokens)
# We need to take a copy of the locks as dropping the locks will cause
# the dictionary to change.
locks = list(self._live_lock_tokens.values()) + list(
self._live_read_write_lock_tokens.values()
)

for lock in locks.values():
for lock in locks:
await lock.release()

logger.info("Dropped locks due to shutdown")
Expand All @@ -122,7 +131,7 @@ async def _try_acquire_lock(
"""

# Check if this process has taken out a lock and if it's still valid.
lock = self._live_tokens.get((lock_name, lock_key))
lock = self._live_lock_tokens.get((lock_name, lock_key))
if lock and await lock.is_still_valid():
return None

Expand Down Expand Up @@ -176,12 +185,113 @@ def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
self._reactor,
self._clock,
self,
read_write=False,
lock_name=lock_name,
lock_key=lock_key,
token=token,
)

self._live_tokens[(lock_name, lock_key)] = lock
self._live_lock_tokens[(lock_name, lock_key)] = lock

return lock

async def try_acquire_read_write_lock(
self,
lock_name: str,
lock_key: str,
write: bool,
) -> Optional["Lock"]:
"""Try to acquire a lock for the given name/key. Will return an async
context manager if the lock is successfully acquired, which *must* be
used (otherwise the lock will leak).
"""

now = self._clock.time_msec()
token = random_string(6)

def _try_acquire_read_write_lock_txn(txn: LoggingTransaction) -> None:
# We attempt to acquire the lock by inserting into
# `worker_read_write_locks` and seeing if that fails any
# constraints. If it doesn't then we have acquired the lock,
# otherwise we haven't.
#
# Before that though we clear the table of any stale locks.

delete_sql = """
DELETE FROM worker_read_write_locks
WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
"""

insert_sql = """
INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (lock_name, lock_key, token)
DO UPDATE
SET
last_renewed_ts = EXCLUDED.last_renewed_ts
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
"""

if isinstance(self.database_engine, PostgresEngine):
# For Postgres we can send these queries at the same time.
txn.execute(
delete_sql + ";" + insert_sql,
(
# DELETE args
now - _LOCK_TIMEOUT_MS,
lock_name,
lock_key,
# UPSERT args
lock_name,
lock_key,
write,
self._instance_name,
token,
now,
),
)
else:
# For SQLite these need to be two queries.
txn.execute(
delete_sql,
(
now - _LOCK_TIMEOUT_MS,
lock_name,
lock_key,
),
)
txn.execute(
insert_sql,
(
lock_name,
lock_key,
write,
self._instance_name,
token,
now,
),
)

return

try:
await self.db_pool.runInteraction(
"try_acquire_read_write_lock",
_try_acquire_read_write_lock_txn,
)
except self.database_engine.module.IntegrityError:
return None

lock = Lock(
self._reactor,
self._clock,
self,
read_write=True,
lock_name=lock_name,
lock_key=lock_key,
token=token,
)

self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock

return lock

Expand Down Expand Up @@ -212,20 +322,31 @@ def __init__(
reactor: IReactorCore,
clock: Clock,
store: LockStore,
read_write: bool,
lock_name: str,
lock_key: str,
token: str,
) -> None:
self._reactor = reactor
self._clock = clock
self._store = store
self._read_write = read_write
self._lock_name = lock_name
self._lock_key = lock_key

self._token = token

self._table = "worker_read_write_locks" if read_write else "worker_locks"

self._looping_call = clock.looping_call(
self._renew, _RENEWAL_INTERVAL_MS, store, clock, lock_name, lock_key, token
self._renew,
_RENEWAL_INTERVAL_MS,
store,
clock,
read_write,
lock_name,
lock_key,
token,
)

self._dropped = False
Expand All @@ -235,6 +356,7 @@ def __init__(
async def _renew(
store: LockStore,
clock: Clock,
read_write: bool,
lock_name: str,
lock_key: str,
token: str,
Expand All @@ -245,8 +367,9 @@ async def _renew(
don't end up with a reference to `self` in the reactor, which would stop
this from being cleaned up if we dropped the context manager.
"""
table = "worker_read_write_locks" if read_write else "worker_locks"
await store.db_pool.simple_update(
table="worker_locks",
table=table,
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
Expand All @@ -259,7 +382,7 @@ async def _renew(
async def is_still_valid(self) -> bool:
"""Check if the lock is still held by us"""
last_renewed_ts = await self._store.db_pool.simple_select_one_onecol(
table="worker_locks",
table=self._table,
keyvalues={
"lock_name": self._lock_name,
"lock_key": self._lock_key,
Expand Down Expand Up @@ -301,7 +424,7 @@ async def release(self) -> None:
self._looping_call.stop()

await self._store.db_pool.simple_delete(
table="worker_locks",
table=self._table,
keyvalues={
"lock_name": self._lock_name,
"lock_key": self._lock_key,
Expand All @@ -310,7 +433,12 @@ async def release(self) -> None:
desc="drop_lock",
)

self._store._live_tokens.pop((self._lock_name, self._lock_key), None)
if self._read_write:
self._store._live_read_write_lock_tokens.pop(
(self._lock_name, self._lock_key, self._token), None
)
else:
self._store._live_lock_tokens.pop((self._lock_name, self._lock_key), None)

self._dropped = True

Expand Down
90 changes: 90 additions & 0 deletions synapse/storage/schema/main/delta/78/03_read_write_locks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


-- We implement read/write style locks by using two tables with mutual foreign
-- key constraints. Note that this implementation is vulnerable to starving
-- writers if read locks repeatedly get acquired.
--
-- The first table (`worker_read_write_locks_mode`) indicates that a given lock
-- has either been acquired in read mode *or* write mode, but not both. This is
-- enforced by the unique constraint. Each instance of a lock being acquired is
-- associated with a random `token`.
--
-- The second table (`worker_read_write_locks`) tracks who has currently
-- acquired a given lock. It ensures that a lock can only be acquired once in
-- write mode by using unique constraints.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
--
-- The foreign key from the second to first table enforces that for any given
-- lock the second table cannot have a mix of rows with read or write.
--
-- The foreign key from the first to second table enforces that we don't have a
-- row for a lock in the first table if not in the second table. (Only supported
-- on PostgreSQL).
--
--
-- Furthermore, we add some triggers to automatically keep the first table up to
-- date when inserting/deleting from the second table. This reduces the number
-- of round trips needed to acquire and release locks, as those operations
-- simply become an INSERT or DELETE. These triggers are added in a separate
-- delta due to database specific syntax.


-- A table to track whether a lock is currently acquired, and if so whether its
-- in read or write mode.
CREATE TABLE worker_read_write_locks_mode (
lock_name TEXT NOT NULL,
lock_key TEXT NOT NULL,
-- Whether this lock is in read (false) or write (true) mode
write_lock BOOLEAN NOT NULL,
-- A token that has currently acquired the lock. We need this so that we can
-- add a foreign constraint from this table to `worker_read_write_locks`.
token TEXT NOT NULL
);

-- Ensure that we can only have one row per lock
CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
-- We need this (redundant) constraint so that we can have a foreign key
-- constraint against this table.
CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);


-- A table to track who has currently acquired a given lock.
CREATE TABLE worker_read_write_locks (
lock_name TEXT NOT NULL,
lock_key TEXT NOT NULL,
-- We write the instance name to ease manual debugging, we don't ever read
-- from it.
-- Note: instance names aren't guarenteed to be unique.
instance_name TEXT NOT NULL,
-- A token that has currently acquired the lock. We need this so that we can
-- add a foreign constraint from this table to `worker_read_write_locks`.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
write_lock BOOLEAN NOT NULL,
-- A random string generated each time an instance takes out a lock. Used by
-- the instance to tell whether the lock is still held by it (e.g. in the
-- case where the process stalls for a long time the lock may time out and
-- be taken out by another instance, at which point the original instance
-- can tell it no longer holds the lock as the tokens no longer match).
token TEXT NOT NULL,
last_renewed_ts BIGINT NOT NULL,

-- This constraint ensures that a given lock has only been acquired in read
-- xor write mode, but not both.
FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock)
);

CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
-- Ensures that only one instance can acquire a lock in write mode at a time.
CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
Loading