From c0655035f17e685db86eafb8fe7405822c6f726d Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Wed, 18 Dec 2019 11:06:09 +0100 Subject: [PATCH 1/6] db: Consolidate access to the changes in a db We were passing them in separately, while we could just retrieve them from the db instance instead. --- lightningd/plugin_hook.c | 5 ++--- lightningd/plugin_hook.h | 5 ++--- wallet/db.c | 7 ++++++- wallet/db.h | 5 +++++ wallet/test/run-db.c | 2 +- wallet/test/run-wallet.c | 2 +- 6 files changed, 17 insertions(+), 9 deletions(-) diff --git a/lightningd/plugin_hook.c b/lightningd/plugin_hook.c index e1fd750a307f..f6a8f8b85da2 100644 --- a/lightningd/plugin_hook.c +++ b/lightningd/plugin_hook.c @@ -156,13 +156,14 @@ static void db_hook_response(const char *buffer, const jsmntok_t *toks, io_break(ph_req); } -void plugin_hook_db_sync(struct db *db, const char **changes, const char *final) +void plugin_hook_db_sync(struct db *db) { const struct plugin_hook *hook = &db_write_hook; struct jsonrpc_request *req; struct plugin_hook_request *ph_req; void *ret; + const char **changes = db_changes(db); if (!hook->plugin) return; @@ -177,8 +178,6 @@ void plugin_hook_db_sync(struct db *db, const char **changes, const char *final) json_array_start(req->stream, "writes"); for (size_t i = 0; i < tal_count(changes); i++) json_add_string(req->stream, NULL, changes[i]); - if (final) - json_add_string(req->stream, NULL, final); json_array_end(req->stream); jsonrpc_request_end(req); diff --git a/lightningd/plugin_hook.h b/lightningd/plugin_hook.h index 4869d69e9af3..d5ed4d659711 100644 --- a/lightningd/plugin_hook.h +++ b/lightningd/plugin_hook.h @@ -108,8 +108,7 @@ bool plugin_hook_unregister(struct plugin *plugin, const char *method); /* Unregister all hooks a plugin has registered for */ void plugin_hook_unregister_all(struct plugin *plugin); -/* Special sync plugin hook for db: changes[] are SQL statements, with optional - * final command appended. */ -void plugin_hook_db_sync(struct db *db, const char **changes, const char *final); +/* Special sync plugin hook for db. */ +void plugin_hook_db_sync(struct db *db); #endif /* LIGHTNING_LIGHTNINGD_PLUGIN_HOOK_H */ diff --git a/wallet/db.c b/wallet/db.c index c6ed48b3fbaf..741747dafb8a 100644 --- a/wallet/db.c +++ b/wallet/db.c @@ -764,7 +764,7 @@ static void db_report_changes(struct db *db, const char *final, size_t min) assert(tal_count(db->changes) >= min); if (tal_count(db->changes) > min) - plugin_hook_db_sync(db, db->changes, final); + plugin_hook_db_sync(db); db->changes = tal_free(db->changes); } @@ -1399,3 +1399,8 @@ void db_changes_add(struct db_stmt *stmt, const char * expanded) tal_arr_expand(&db->changes, tal_strdup(db->changes, expanded)); } + +const char **db_changes(struct db *db) +{ + return db->changes; +} diff --git a/wallet/db.h b/wallet/db.h index 2a0c2b6565cb..7061a3db8d9d 100644 --- a/wallet/db.h +++ b/wallet/db.h @@ -224,4 +224,9 @@ struct db_stmt *db_prepare_v2_(const char *location, struct db *db, #define db_prepare_v2(db,query) \ db_prepare_v2_(__FILE__ ":" stringify(__LINE__), db, query) +/** + * Access pending changes that have been added to the current transaction. + */ +const char **db_changes(struct db *db); + #endif /* LIGHTNING_WALLET_DB_H */ diff --git a/wallet/test/run-db.c b/wallet/test/run-db.c index a6859bddcd7c..db9ddb953873 100644 --- a/wallet/test/run-db.c +++ b/wallet/test/run-db.c @@ -47,7 +47,7 @@ static void db_test_fatal(const char *fmt, ...) va_end(ap); } -void plugin_hook_db_sync(struct db *db UNNEEDED, const char **changes UNNEEDED, const char *final UNNEEDED) +void plugin_hook_db_sync(struct db *db UNNEEDED) { } diff --git a/wallet/test/run-wallet.c b/wallet/test/run-wallet.c index 02f8fe95c577..7010abd7a862 100644 --- a/wallet/test/run-wallet.c +++ b/wallet/test/run-wallet.c @@ -650,7 +650,7 @@ u8 *wire_sync_read(const tal_t *ctx UNNEEDED, int fd UNNEEDED) { return NULL; } -void plugin_hook_db_sync(struct db *db UNNEEDED, const char **changes UNNEEDED, const char *final UNNEEDED) +void plugin_hook_db_sync(struct db *db UNNEEDED) { } bool fromwire_hsm_get_channel_basepoints_reply(const void *p UNNEEDED, From d84f5a983a178e1ca918c1d7c2eb50f705e19ddf Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Wed, 18 Dec 2019 18:57:37 +0100 Subject: [PATCH 2/6] db: Move db_migrate transaction up one level We are about to do some more operations before committing, so moving this up allows us to reuse the same transaction. --- wallet/db.c | 8 +++++--- wallet/test/run-db.c | 5 +++-- wallet/test/run-wallet.c | 2 ++ 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/wallet/db.c b/wallet/db.c index 741747dafb8a..49951effe35c 100644 --- a/wallet/db.c +++ b/wallet/db.c @@ -905,8 +905,6 @@ static void db_migrate(struct lightningd *ld, struct db *db) int current, orig, available; struct db_stmt *stmt; - db_begin_transaction(db); - orig = current = db_get_version(db); available = ARRAY_SIZE(dbmigrations) - 1; @@ -947,14 +945,18 @@ static void db_migrate(struct lightningd *ld, struct db *db) tal_free(stmt); } - db_commit_transaction(db); } struct db *db_setup(const tal_t *ctx, struct lightningd *ld) { struct db *db = db_open(ctx, ld->wallet_dsn); db->log = new_log(db, ld->log_book, NULL, "database"); + + db_begin_transaction(db); + db_migrate(ld, db); + + db_commit_transaction(db); return db; } diff --git a/wallet/test/run-db.c b/wallet/test/run-db.c index db9ddb953873..bf326366cbd3 100644 --- a/wallet/test/run-db.c +++ b/wallet/test/run-db.c @@ -73,8 +73,9 @@ static bool test_empty_db_migrate(struct lightningd *ld) CHECK(db); db_begin_transaction(db); CHECK(db_get_version(db) == -1); - db_commit_transaction(db); db_migrate(ld, db); + db_commit_transaction(db); + db_begin_transaction(db); CHECK(db_get_version(db) == ARRAY_SIZE(dbmigrations) - 1); db_commit_transaction(db); @@ -118,9 +119,9 @@ static bool test_vars(struct lightningd *ld) struct db *db = create_test_db(); char *varname = "testvar"; CHECK(db); - db_migrate(ld, db); db_begin_transaction(db); + db_migrate(ld, db); /* Check default behavior */ CHECK(db_get_intvar(db, varname, 42) == 42); diff --git a/wallet/test/run-wallet.c b/wallet/test/run-wallet.c index 7010abd7a862..63db38de4bc2 100644 --- a/wallet/test/run-wallet.c +++ b/wallet/test/run-wallet.c @@ -747,7 +747,9 @@ static struct wallet *create_test_wallet(struct lightningd *ld, const tal_t *ctx w->bip32_base) == WALLY_OK); CHECK_MSG(w->db, "Failed opening the db"); + db_begin_transaction(w->db); db_migrate(ld, w->db); + db_commit_transaction(w->db); CHECK_MSG(!wallet_err, "DB migration failed"); w->max_channel_dbid = 0; From c6c543cd567015ee42d182ed7fce09d6b5d4a1df Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Wed, 18 Dec 2019 19:26:23 +0100 Subject: [PATCH 3/6] db: Add tracking of whether the current transaction is dirty --- wallet/db.c | 13 +++++++++++++ wallet/db_common.h | 4 ++++ 2 files changed, 17 insertions(+) diff --git a/wallet/db.c b/wallet/db.c index 49951effe35c..811fd29fee3b 100644 --- a/wallet/db.c +++ b/wallet/db.c @@ -763,6 +763,11 @@ static void db_report_changes(struct db *db, const char *final, size_t min) assert(db->changes); assert(tal_count(db->changes) >= min); + /* Having changes implies that we have a dirty TX. The opposite is + * currently not true, e.g., the postgres driver doesn't record + * changes yet. */ + assert(!tal_count(db->changes) || db->dirty); + if (tal_count(db->changes) > min) plugin_hook_db_sync(db); db->changes = tal_free(db->changes); @@ -785,6 +790,9 @@ void db_begin_transaction_(struct db *db, const char *location) if (db->in_transaction) db_fatal("Already in transaction from %s", db->in_transaction); + /* No writes yet. */ + db->dirty = false; + db_prepare_for_changes(db); ok = db->config->begin_tx_fn(db); if (!ok) @@ -805,6 +813,7 @@ void db_commit_transaction(struct db *db) db_fatal("Failed to commit DB transaction: %s", db->error); db->in_transaction = NULL; + db->dirty = false; } static struct db_config *db_config_find(const char *dsn) @@ -1357,6 +1366,10 @@ void db_column_txid(struct db_stmt *stmt, int pos, struct bitcoin_txid *t) bool db_exec_prepared_v2(struct db_stmt *stmt TAKES) { bool ret = stmt->db->config->exec_fn(stmt); + + /* If this was a write we need to bump the data_version upon commit. */ + stmt->db->dirty = stmt->db->dirty || !stmt->query->readonly; + stmt->executed = true; list_del_from(&stmt->db->pending_statements, &stmt->list); diff --git a/wallet/db_common.h b/wallet/db_common.h index 0fa1a09ed9b9..3d56d6b7373e 100644 --- a/wallet/db_common.h +++ b/wallet/db_common.h @@ -30,6 +30,10 @@ struct db { char *error; struct log *log; + + /* Were there any modifying statements in the current transaction? + * Used to bump the data_version in the DB.*/ + bool dirty; }; struct db_query { From 73a5d5a4ed2a86a75bb0aee041ad6e9440a5a5db Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Wed, 18 Dec 2019 19:41:01 +0100 Subject: [PATCH 4/6] db: Add numeric data_version counter to count modifying transactions This counter is incremented on each dirty transaction. --- wallet/db.c | 1 + 1 file changed, 1 insertion(+) diff --git a/wallet/db.c b/wallet/db.c index 811fd29fee3b..ea67b4f1546f 100644 --- a/wallet/db.c +++ b/wallet/db.c @@ -589,6 +589,7 @@ static struct migration dbmigrations[] = { " SELECT id, 11, local_feerate_per_kw FROM channels WHERE funder = 1 and local_feerate_per_kw != remote_feerate_per_kw;"), NULL}, /* FIXME: Remove now-unused local_feerate_per_kw and remote_feerate_per_kw from channels */ + {SQL("INSERT INTO vars (name, intval) VALUES ('data_version', 0);"), NULL}, }; /* Leak tracking. */ From 1285d26d7558dd513fe1d2b93ff996907e818e2a Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Wed, 18 Dec 2019 19:45:24 +0100 Subject: [PATCH 5/6] db: Track the data_version in the database This increments the `data_version` upon committing dirty transactions, reads the last data_version upon startup, and tracks the number in memory in parallel to the DB (see next commit for rationale). Changelog-Changed: JSON-RPC: Added a `data_version` field to the `db_write` hook which returns a numeric transaction counter. --- lightningd/plugin_hook.c | 2 ++ wallet/db.c | 28 ++++++++++++++++++++++++++++ wallet/db.h | 3 +++ wallet/db_common.h | 4 ++++ wallet/test/run-db.c | 5 +++++ wallet/test/run-wallet.c | 1 + 6 files changed, 43 insertions(+) diff --git a/lightningd/plugin_hook.c b/lightningd/plugin_hook.c index f6a8f8b85da2..271389c49dee 100644 --- a/lightningd/plugin_hook.c +++ b/lightningd/plugin_hook.c @@ -175,6 +175,8 @@ void plugin_hook_db_sync(struct db *db) ph_req->hook = hook; ph_req->db = db; + json_add_num(req->stream, "data_version", db_data_version_get(db)); + json_array_start(req->stream, "writes"); for (size_t i = 0; i < tal_count(changes); i++) json_add_string(req->stream, NULL, changes[i]); diff --git a/wallet/db.c b/wallet/db.c index ea67b4f1546f..6d0bee788bc5 100644 --- a/wallet/db.c +++ b/wallet/db.c @@ -802,11 +802,27 @@ void db_begin_transaction_(struct db *db, const char *location) db->in_transaction = location; } +static void db_data_version_incr(struct db *db) +{ + struct db_stmt *stmt = db_prepare_v2( + db, SQL("UPDATE vars " + "SET intval = intval + 1 " + "WHERE name = 'data_version'")); + db_exec_prepared_v2(stmt); + tal_free(stmt); + db->data_version++; +} + void db_commit_transaction(struct db *db) { bool ok; assert(db->in_transaction); db_assert_no_outstanding_statements(db); + + /* Increment before reporting changes to an eventual plugin. */ + if (db->dirty) + db_data_version_incr(db); + db_report_changes(db, NULL, 0); ok = db->config->commit_tx_fn(db); @@ -954,7 +970,18 @@ static void db_migrate(struct lightningd *ld, struct db *db) db_exec_prepared_v2(stmt); tal_free(stmt); } +} +u32 db_data_version_get(struct db *db) +{ + struct db_stmt *stmt; + u32 version; + stmt = db_prepare_v2(db, SQL("SELECT intval FROM vars WHERE name = 'data_version'")); + db_query_prepared(stmt); + db_step(stmt); + version = db_column_int(stmt, 0); + tal_free(stmt); + return version; } struct db *db_setup(const tal_t *ctx, struct lightningd *ld) @@ -966,6 +993,7 @@ struct db *db_setup(const tal_t *ctx, struct lightningd *ld) db_migrate(ld, db); + db->data_version = db_data_version_get(db); db_commit_transaction(db); return db; } diff --git a/wallet/db.h b/wallet/db.h index 7061a3db8d9d..5f352be2b5c6 100644 --- a/wallet/db.h +++ b/wallet/db.h @@ -229,4 +229,7 @@ struct db_stmt *db_prepare_v2_(const char *location, struct db *db, */ const char **db_changes(struct db *db); +/* Get the current data version. */ +u32 db_data_version_get(struct db *db); + #endif /* LIGHTNING_WALLET_DB_H */ diff --git a/wallet/db_common.h b/wallet/db_common.h index 3d56d6b7373e..69fdd9da028b 100644 --- a/wallet/db_common.h +++ b/wallet/db_common.h @@ -34,6 +34,10 @@ struct db { /* Were there any modifying statements in the current transaction? * Used to bump the data_version in the DB.*/ bool dirty; + + /* The current DB version we expect to update if changes are + * committed. */ + u32 data_version; }; struct db_query { diff --git a/wallet/test/run-db.c b/wallet/test/run-db.c index bf326366cbd3..f45be567557d 100644 --- a/wallet/test/run-db.c +++ b/wallet/test/run-db.c @@ -63,6 +63,7 @@ static struct db *create_test_db(void) dsn = tal_fmt(NULL, "sqlite3://%s", filename); db = db_open(NULL, dsn); + db->data_version = 0; tal_free(dsn); return db; } @@ -107,6 +108,10 @@ static bool test_primitives(void) CHECK_MSG(db_err, "Failing SQL command"); tal_free(stmt); db_err = tal_free(db_err); + + /* We didn't migrate the DB, so don't have the vars table. Pretend we + * didn't change anything so we don't bump the data_version. */ + db->dirty = false; db_commit_transaction(db); CHECK(!db->in_transaction); tal_free(db); diff --git a/wallet/test/run-wallet.c b/wallet/test/run-wallet.c index 63db38de4bc2..7928c3e5592f 100644 --- a/wallet/test/run-wallet.c +++ b/wallet/test/run-wallet.c @@ -749,6 +749,7 @@ static struct wallet *create_test_wallet(struct lightningd *ld, const tal_t *ctx CHECK_MSG(w->db, "Failed opening the db"); db_begin_transaction(w->db); db_migrate(ld, w->db); + w->db->data_version = 0; db_commit_transaction(w->db); CHECK_MSG(!wallet_err, "DB migration failed"); w->max_channel_dbid = 0; From 80762481c840aaace4be176ad8c81df08d4cd1c2 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Wed, 18 Dec 2019 20:00:13 +0100 Subject: [PATCH 6/6] db: Turn the transaction counter into an optimistic lock The optimistic lock prevents multiple instances of c-lightning making concurrent modifications to the database. That would be unsafe as it messes up the state in the DB. The optimistic lock is implemented by checking whether a gated update on the previous value of the `data_version` actually results in an update. If that's not the case the DB has been changed under our feet. The lock provides linearizability of DB modifications: if a database is changed under the feet of a running process that process will `abort()`, which from a global point of view is as if it had crashed right after the last successful commit. Any process that also changed the DB must've started between the last successful commit and the unsuccessful one since otherwise its counters would not have matched (which would also have aborted that transaction). So this reduces all the possible timelines to an equivalent where the first process died, and the second process recovered from the DB. This is not that interesting for `sqlite3` where we are also protected via the PID file, but when running on multiple hosts against the same DB, e.g., with `postgres`, this protection becomes important. Changelog-Added: DB: Optimistic logging prevents instances from running concurrently against the same database, providing linear consistency to changes. --- tests/test_db.py | 26 ++++++++++++++++++++++++-- wallet/db.c | 19 ++++++++++++++++++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/tests/test_db.py b/tests/test_db.py index ee942eed44cc..c1e0c4edc4ad 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,8 +1,10 @@ from fixtures import * # noqa: F401,F403 -from utils import wait_for, sync_blockheight, COMPAT from fixtures import TEST_NETWORK - +from pyln.client import RpcError +from utils import wait_for, sync_blockheight, COMPAT import os +import pytest +import time import unittest @@ -136,3 +138,23 @@ def test_scid_upgrade(node_factory, bitcoind): assert l1.db_query('SELECT short_channel_id from channels;') == [{'short_channel_id': '103x1x1'}] assert l1.db_query('SELECT failchannel from payments;') == [{'failchannel': '103x1x1'}] + + +def test_optimistic_locking(node_factory, bitcoind): + """Have a node run against a DB, then change it under its feet, crashing it. + + We start a node, wait for it to settle its write so we have a window where + we can interfere, and watch the world burn (safely). + """ + l1 = node_factory.get_node(may_fail=True, allow_broken_log=True) + + sync_blockheight(bitcoind, [l1]) + l1.rpc.getinfo() + time.sleep(1) + l1.db.execute("UPDATE vars SET intval = intval + 1 WHERE name = 'data_version';") + + # Now trigger any DB write and we should be crashing. + with pytest.raises(RpcError, match=r'Connection to RPC server lost.'): + l1.rpc.newaddr() + + assert(l1.daemon.is_in_log(r'Optimistic lock on the database failed')) diff --git a/wallet/db.c b/wallet/db.c index 6d0bee788bc5..edfe3f4475e5 100644 --- a/wallet/db.c +++ b/wallet/db.c @@ -802,13 +802,30 @@ void db_begin_transaction_(struct db *db, const char *location) db->in_transaction = location; } +/* By making the update conditional on the current value we expect we + * are implementing an optimistic lock: if the update results in + * changes on the DB we know that the data_version did not change + * under our feet and no other transaction ran in the meantime. + * + * Notice that this update effectively locks the row, so that other + * operations attempting to change this outside the transaction will + * wait for this transaction to complete. The external change will + * ultimately fail the changes test below, it'll just delay its abort + * until our transaction is committed. + */ static void db_data_version_incr(struct db *db) { struct db_stmt *stmt = db_prepare_v2( db, SQL("UPDATE vars " "SET intval = intval + 1 " - "WHERE name = 'data_version'")); + "WHERE name = 'data_version'" + " AND intval = ?")); + db_bind_int(stmt, 0, db->data_version); db_exec_prepared_v2(stmt); + if (db_count_changes(stmt) != 1) + fatal("Optimistic lock on the database failed. There may be a " + "concurrent access to the database. Aborting since " + "concurrent access is unsafe."); tal_free(stmt); db->data_version++; }