Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce serialization errors in MultiWriterIdGen #8456

Merged
merged 8 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8456.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce number of serialization errors of `MultiWriterIdGenerator._update_table`.
69 changes: 63 additions & 6 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,24 @@ def new_transaction(
*args: Any,
**kwargs: Any
) -> R:
"""Start a new database transaction with the given connection.

Note: The given func may be called multiple times under certain
failure modes. This is normally fine when in a standard transaction,
but care must be taken if the connection is in `autocommit` mode that
the function will correctly handle being aborted and retried half way
through its execution.

Args:
conn
desc
after_callbacks
exception_callbacks
func
*args
**kwargs
"""

start = monotonic_time()
txn_id = self._TXN_ID

Expand Down Expand Up @@ -566,7 +584,12 @@ def new_transaction(
sql_txn_timer.labels(desc).observe(duration)

async def runInteraction(
self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any
self,
desc: str,
func: "Callable[..., R]",
*args: Any,
db_autocommit: bool = False,
**kwargs: Any
) -> R:
"""Starts a transaction on the database and runs a given function

Expand All @@ -576,6 +599,18 @@ async def runInteraction(
database transaction (twisted.enterprise.adbapi.Transaction) as
its first argument, followed by `args` and `kwargs`.

db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transactions
that are only a single query.

Currently, this is only implemented for Postgres. SQLite will still
run the function inside a transaction.

WARNING: This means that if func fails half way through then
the changes will *not* be rolled back. `func` may also get
called multiple times if the transaction is retried, so must
correctly handle that case.

args: positional args to pass to `func`
kwargs: named args to pass to `func`

Expand All @@ -596,6 +631,7 @@ async def runInteraction(
exception_callbacks,
func,
*args,
db_autocommit=db_autocommit,
**kwargs
)

Expand All @@ -609,7 +645,11 @@ async def runInteraction(
return cast(R, result)

async def runWithConnection(
self, func: "Callable[..., R]", *args: Any, **kwargs: Any
self,
func: "Callable[..., R]",
*args: Any,
db_autocommit: bool = False,
**kwargs: Any
) -> R:
"""Wraps the .runWithConnection() method on the underlying db_pool.

Expand All @@ -618,6 +658,9 @@ async def runWithConnection(
database connection (twisted.enterprise.adbapi.Connection) as
its first argument, followed by `args` and `kwargs`.
args: positional args to pass to `func`
db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transaction
that are only a single query. Currently only affects postgres.
kwargs: named args to pass to `func`

Returns:
Expand All @@ -633,6 +676,13 @@ async def runWithConnection(
start_time = monotonic_time()

def inner_func(conn, *args, **kwargs):
# We shouldn't be in a transaction. If we are then something
# somewhere hasn't committed after doing work. (This is likely only
# possible during startup, as `run*` will ensure changes are
# committed/rolled back before putting the connection back in the
# pool).
assert not self.engine.in_transaction(conn)

with LoggingContext("runWithConnection", parent_context) as context:
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
Expand All @@ -642,10 +692,17 @@ def inner_func(conn, *args, **kwargs):
logger.debug("Reconnecting closed database connection")
conn.reconnect()

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
try:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, True)

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
finally:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, False)

return await make_deferred_yieldable(
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
Expand Down
17 changes: 17 additions & 0 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,20 @@ def server_version(self) -> str:
"""Gets a string giving the server version. For example: '3.22.0'
"""
...

@abc.abstractmethod
def in_transaction(self, conn: Connection) -> bool:
"""Whether the connection is currently in a transaction.
"""
...

@abc.abstractmethod
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
"""Attempt to set the connections autocommit mode.

When True queries are run outside of transactions.

Note: This has no effect on SQLite3, so callers still need to
commit/rollback the connections.
"""
...
10 changes: 9 additions & 1 deletion synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

import logging

from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup
from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
from synapse.storage.types import Connection

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -119,6 +120,7 @@ def on_new_connection(self, db_conn):
cursor.execute("SET synchronous_commit TO OFF")

cursor.close()
db_conn.commit()

@property
def can_native_upsert(self):
Expand Down Expand Up @@ -171,3 +173,9 @@ def server_version(self):
return "%i.%i" % (numver / 10000, numver % 10000)
else:
return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)

def in_transaction(self, conn: Connection) -> bool:
return conn.status != self.module.extensions.STATUS_READY # type: ignore

def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
return conn.set_session(autocommit=autocommit) # type: ignore
10 changes: 10 additions & 0 deletions synapse/storage/engines/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import typing

from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.types import Connection

if typing.TYPE_CHECKING:
import sqlite3 # noqa: F401
Expand Down Expand Up @@ -86,6 +87,7 @@ def on_new_connection(self, db_conn):

db_conn.create_function("rank", 1, _rank)
db_conn.execute("PRAGMA foreign_keys = ON;")
db_conn.commit()

def is_deadlock(self, error):
return False
Expand All @@ -105,6 +107,14 @@ def server_version(self):
"""
return "%i.%i.%i" % self.module.sqlite_version_info

def in_transaction(self, conn: Connection) -> bool:
return conn.in_transaction # type: ignore

def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
# Twisted doesn't let us set attributes on the connections, so we can't
# set the connection to autocommit mode.
Comment on lines +114 to +115
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's Twisted to do with it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connections we get are Twisted connections that wrap the underlying native connections, the wrapper implements __getattr__ but not __setattr__ so we can't set the autocommit flag

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, right.

I suppose you could delve into conn._connection, but I guess it doesn't really matter? It might be worth commenting somewhere (at the call site, maybe), that this is will only attempt to enable autocommit, so the transaction must do an explicit commit anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could go and pull out _connection, but then I'd want to handle the case where they rename that variable in a future release.

pass


# Following functions taken from: https://github.com/coleifer/peewee

Expand Down
12 changes: 11 additions & 1 deletion synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.types import Cursor
from synapse.storage.util.sequence import PostgresSequenceGenerator

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -548,7 +549,7 @@ def _add_persisted_position(self, new_id: int):
# do.
break

def _update_stream_positions_table_txn(self, txn):
def _update_stream_positions_table_txn(self, txn: Cursor):
"""Update the `stream_positions` table with newly persisted position.
"""

Expand Down Expand Up @@ -598,10 +599,13 @@ class _MultiWriterCtxManager:
stream_ids = attr.ib(type=List[int], factory=list)

async def __aenter__(self) -> Union[int, List[int]]:
# It's safe to run this in autocommit mode as fetching values from a
# sequence ignores transaction semantics anyway.
self.stream_ids = await self.id_gen._db.runInteraction(
"_load_next_mult_id",
self.id_gen._load_next_mult_id_txn,
self.multiple_ids or 1,
db_autocommit=True,
)

# Assert the fetched ID is actually greater than any ID we've already
Expand Down Expand Up @@ -632,10 +636,16 @@ async def __aexit__(self, exc_type, exc, tb):
#
# We only do this on the success path so that the persisted current
# position points to a persisted row with the correct instance name.
#
# We do this in autocommit mode as a) the upsert works correctly outside
# transactions and b) reduces the amount of time the rows are locked
# for. If we don't do this then we'll often hit serialization errors due
# to the fact we default to REPEATABLE READ isolation levels.
if self.id_gen._writers:
await self.id_gen._db.runInteraction(
"MultiWriterIdGenerator._update_table",
self.id_gen._update_stream_positions_table_txn,
db_autocommit=True,
)

return False
1 change: 1 addition & 0 deletions tests/storage/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def runWithConnection(func, *args, **kwargs):
engine = create_engine(sqlite_config)
fake_engine = Mock(wraps=engine)
fake_engine.can_native_upsert = False
fake_engine.in_transaction.return_value = False

db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine)
db._db_pool = self.db_pool
Expand Down