Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce serialization errors in MultiWriterIdGen #8456

Merged
merged 8 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8456.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce number of serialization errors of `MultiWriterIdGenerator._update_table`.
65 changes: 57 additions & 8 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,23 @@ def new_transaction(
exception_callbacks: List[_CallbackListEntry],
func: "Callable[..., R]",
*args: Any,
db_retry: bool = True,
**kwargs: Any
) -> R:
"""Start a new database transaction with the given connection.

Args:
conn
desc
after_callbacks
exception_callbacks
func
*args
db_retry: Whether to retry the transaction by calling `func` again.
This should be disabled if connection is in autocommit mode.
Copy link
Member

@richvdh richvdh Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
db_retry: Whether to retry the transaction by calling `func` again.
This should be disabled if connection is in autocommit mode.
db_retry: Whether to retry the transaction after an OperationalError
or DatabaseError by calling `func` again.
This should be disabled if connection is in autocommit mode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem with disabling this is that it's the thing that deals with postgres connections dropping (which can happen for annoying TCP reasons). I'm not sure it's safe to do: why is it required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike functions that are run in transactions its not necessarily true its safe to just re-run transaction functions, since they might have done half the work already. I guess we can mandate that such functions should be safe to re-run? (All the usages are for example)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done that.

**kwargs
"""

start = monotonic_time()
txn_id = self._TXN_ID

Expand Down Expand Up @@ -493,7 +508,7 @@ def new_transaction(
transaction_logger.warning(
"[TXN OPERROR] {%s} %s %d/%d", name, e, i, N,
)
if i < N:
if db_retry and i < N:
i += 1
try:
conn.rollback()
Expand All @@ -506,7 +521,7 @@ def new_transaction(
transaction_logger.warning(
"[TXN DEADLOCK] {%s} %d/%d", name, i, N
)
if i < N:
if db_retry and i < N:
i += 1
try:
conn.rollback()
Expand Down Expand Up @@ -566,7 +581,12 @@ def new_transaction(
sql_txn_timer.labels(desc).observe(duration)

async def runInteraction(
self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any
self,
desc: str,
func: "Callable[..., R]",
*args: Any,
db_autocommit: bool = False,
**kwargs: Any
) -> R:
"""Starts a transaction on the database and runs a given function

Expand All @@ -576,6 +596,12 @@ async def runInteraction(
database transaction (twisted.enterprise.adbapi.Transaction) as
its first argument, followed by `args` and `kwargs`.

db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transaction
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
that are only a single query. Currently only affects postgres.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
WARNING: This means that if func fails half way through then
the changes will *not* be rolled back.

args: positional args to pass to `func`
kwargs: named args to pass to `func`

Expand All @@ -596,6 +622,8 @@ async def runInteraction(
exception_callbacks,
func,
*args,
db_autocommit=db_autocommit,
db_retry=not db_autocommit, # Don't retry in auto commit mode.
**kwargs
)

Expand All @@ -609,7 +637,11 @@ async def runInteraction(
return cast(R, result)

async def runWithConnection(
self, func: "Callable[..., R]", *args: Any, **kwargs: Any
self,
func: "Callable[..., R]",
*args: Any,
db_autocommit: bool = False,
**kwargs: Any
) -> R:
"""Wraps the .runWithConnection() method on the underlying db_pool.

Expand All @@ -618,6 +650,9 @@ async def runWithConnection(
database connection (twisted.enterprise.adbapi.Connection) as
its first argument, followed by `args` and `kwargs`.
args: positional args to pass to `func`
db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transaction
that are only a single query. Currently only affects postgres.
kwargs: named args to pass to `func`

Returns:
Expand All @@ -633,6 +668,13 @@ async def runWithConnection(
start_time = monotonic_time()

def inner_func(conn, *args, **kwargs):
# We shouldn't be in a transaction. If we are then something
# somewhere hasn't committed after doing work. (This is likely only
# possible during startup, as `run*` will ensure changes are
# committed/rolled back before putting the connection back in the
# pool).
assert not self.engine.in_transaction(conn)

with LoggingContext("runWithConnection", parent_context) as context:
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
Expand All @@ -642,10 +684,17 @@ def inner_func(conn, *args, **kwargs):
logger.debug("Reconnecting closed database connection")
conn.reconnect()

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
try:
if db_autocommit:
self.engine.set_autocommit(conn, True)

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
finally:
if db_autocommit:
self.engine.set_autocommit(conn, False)

return await make_deferred_yieldable(
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
Expand Down
14 changes: 14 additions & 0 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,17 @@ def server_version(self) -> str:
"""Gets a string giving the server version. For example: '3.22.0'
"""
...

@abc.abstractmethod
def in_transaction(self, conn: Connection) -> bool:
"""Whether the connection is currently in a transaction.
"""
...

@abc.abstractmethod
def set_autocommit(self, conn: Connection, autocommit: bool):
"""Set the connections autocommit mode.

When True queries are run outside of transactions.
"""
...
11 changes: 10 additions & 1 deletion synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

import logging

from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup
from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
from synapse.storage.types import Connection

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,6 +108,7 @@ def on_new_connection(self, db_conn):
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
db_conn.set_session(self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

# Set the bytea output to escape, vs the default of hex
cursor = db_conn.cursor()
Expand All @@ -119,6 +121,7 @@ def on_new_connection(self, db_conn):
cursor.execute("SET synchronous_commit TO OFF")

cursor.close()
db_conn.commit()

@property
def can_native_upsert(self):
Expand Down Expand Up @@ -171,3 +174,9 @@ def server_version(self):
return "%i.%i" % (numver / 10000, numver % 10000)
else:
return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)

def in_transaction(self, conn: Connection) -> bool:
return conn.status != self.module.extensions.STATUS_READY # type: ignore

def set_autocommit(self, conn: Connection, autocommit: bool):
return conn.set_session(autocommit=autocommit) # type: ignore
10 changes: 10 additions & 0 deletions synapse/storage/engines/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import typing

from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.types import Connection

if typing.TYPE_CHECKING:
import sqlite3 # noqa: F401
Expand Down Expand Up @@ -86,6 +87,7 @@ def on_new_connection(self, db_conn):

db_conn.create_function("rank", 1, _rank)
db_conn.execute("PRAGMA foreign_keys = ON;")
db_conn.commit()

def is_deadlock(self, error):
return False
Expand All @@ -105,6 +107,14 @@ def server_version(self):
"""
return "%i.%i.%i" % self.module.sqlite_version_info

def in_transaction(self, conn: Connection) -> bool:
return conn.in_transaction # type: ignore

def set_autocommit(self, conn: Connection, autocommit: bool):
# Twisted doesn't let us set attributes on the connections, so we can't
# set the connection to autocommit mode.
Comment on lines +114 to +115
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's Twisted to do with it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connections we get are Twisted connections that wrap the underlying native connections, the wrapper implements __getattr__ but not __setattr__ so we can't set the autocommit flag

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, right.

I suppose you could delve into conn._connection, but I guess it doesn't really matter? It might be worth commenting somewhere (at the call site, maybe), that this is will only attempt to enable autocommit, so the transaction must do an explicit commit anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could go and pull out _connection, but then I'd want to handle the case where they rename that variable in a future release.

pass


# Following functions taken from: https://github.com/coleifer/peewee

Expand Down
11 changes: 10 additions & 1 deletion synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ def _add_persisted_position(self, new_id: int):
# do.
break

def _update_stream_positions_table_txn(self, txn):
def _update_stream_positions_table_txn(self, txn: LoggingTransaction):
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""Update the `stream_positions` table with newly persisted position.
"""

Expand Down Expand Up @@ -598,10 +598,13 @@ class _MultiWriterCtxManager:
stream_ids = attr.ib(type=List[int], factory=list)

async def __aenter__(self) -> Union[int, List[int]]:
# It's safe to run this in autocommit mode as fetching values from a
# sequence ignores transaction semantics anyway.
self.stream_ids = await self.id_gen._db.runInteraction(
"_load_next_mult_id",
self.id_gen._load_next_mult_id_txn,
self.multiple_ids or 1,
db_autocommit=True,
)

# Assert the fetched ID is actually greater than any ID we've already
Expand Down Expand Up @@ -632,10 +635,16 @@ async def __aexit__(self, exc_type, exc, tb):
#
# We only do this on the success path so that the persisted current
# position points to a persisted row with the correct instance name.
#
# We do this in autocommit mode as a) the upsert works correctly outside
# transactions and b) reduces the amount of time the rows are locked
# for. If we don't do this then we'll often hit serialization errors due
# to the fact we default to REPEATABLE READ isolation levels.
if self.id_gen._writers:
await self.id_gen._db.runInteraction(
"MultiWriterIdGenerator._update_table",
self.id_gen._update_stream_positions_table_txn,
db_autocommit=True,
)

return False
1 change: 1 addition & 0 deletions tests/storage/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def runWithConnection(func, *args, **kwargs):
engine = create_engine(sqlite_config)
fake_engine = Mock(wraps=engine)
fake_engine.can_native_upsert = False
fake_engine.in_transaction.return_value = False

db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine)
db._db_pool = self.db_pool
Expand Down