From 5c7c21b6c5ad5326ff5ef3fdf874d3df7204d1dd Mon Sep 17 00:00:00 2001 From: adiholden Date: Mon, 8 Jul 2024 17:21:12 +0300 Subject: [PATCH] fix(bug): crash on takeover and info replication (#3282) Signed-off-by: adi_holden --- src/server/server_family.cc | 22 ++++++++++--------- tests/dragonfly/instance.py | 2 +- tests/dragonfly/replication_test.py | 34 ++++++++++++++--------------- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 341f6ba822c0..b034acf3661e 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2258,9 +2258,12 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } if (should_enter("REPLICATION")) { - ServerState& etl = *ServerState::tlocal(); - - if (etl.is_master) { + unique_lock lk(replicaof_mu_); + // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, + // ensuring eventual consistency of is_master. When determining if the server is a replica and + // accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is + // insufficient in this scenario. + if (!replica_) { append("role", "master"); append("connected_slaves", m.facade_stats.conn_stats.num_replicas); const auto& replicas = m.replication_metrics; @@ -2274,10 +2277,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } else { append("role", GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica"); - // The replica pointer can still be mutated even while master=true, - // we don't want to drop the replica object in this fiber - unique_lock lk{replicaof_mu_}; - auto replication_info_cb = [&](Replica::Info rinfo) { append("master_host", rinfo.host); append("master_port", rinfo.port); @@ -2737,8 +2736,12 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) { auto* rb = static_cast(cntx->reply_builder()); - ServerState& etl = *ServerState::tlocal(); - if (etl.is_master) { + unique_lock lk(replicaof_mu_); + // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, + // ensuring eventual consistency of is_master. When determining if the server is a replica and + // accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is + // insufficient in this scenario. + if (!replica_) { rb->StartArray(2); rb->SendBulkString("master"); auto vec = dfly_cmd_->GetReplicasRoleInfo(); @@ -2751,7 +2754,6 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) { } } else { - unique_lock lk{replicaof_mu_}; rb->StartArray(4 + cluster_replicas_.size() * 3); rb->SendBulkString(GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica"); diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 50ce582080b0..f2744d7c99c9 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -336,7 +336,7 @@ def create(self, existing_port=None, **kwargs) -> DflyInstance: args.setdefault("noversion_check", None) # MacOs does not set it automatically, so we need to set it manually args.setdefault("maxmemory", "8G") - vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1" + vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,dflycmd=1" args.setdefault("vmodule", vmod) args.setdefault("jsonpathv2") diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 3feee76cad3d..bab7d636b623 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1151,10 +1151,7 @@ async def test_readonly_script(df_factory): @pytest.mark.parametrize("master_threads, replica_threads", take_over_cases) @pytest.mark.asyncio async def test_take_over_counters(df_factory, master_threads, replica_threads): - master = df_factory.create( - proactor_threads=master_threads, - logtostderr=True, - ) + master = df_factory.create(proactor_threads=master_threads) replica1 = df_factory.create(proactor_threads=replica_threads) replica2 = df_factory.create(proactor_threads=replica_threads) replica3 = df_factory.create(proactor_threads=replica_threads) @@ -1214,11 +1211,7 @@ async def test_take_over_seeder( request, df_factory, df_seeder_factory, master_threads, replica_threads ): tmp_file_name = "".join(random.choices(string.ascii_letters, k=10)) - master = df_factory.create( - proactor_threads=master_threads, - dbfilename=f"dump_{tmp_file_name}", - logtostderr=True, - ) + master = df_factory.create(proactor_threads=master_threads, dbfilename=f"dump_{tmp_file_name}") replica = df_factory.create(proactor_threads=replica_threads) df_factory.start_all([master, replica]) @@ -1229,17 +1222,27 @@ async def test_take_over_seeder( await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) - async def seed(): - await seeder.run(target_ops=3000) + fill_task = asyncio.create_task(seeder.run()) + + stop_info = False - fill_task = asyncio.create_task(seed()) + async def info_task(): + my_client = replica.client() + while not stop_info: + info = await my_client.info("replication") + asyncio.sleep(0.5) + + info_task = asyncio.create_task(info_task()) # Give the seeder a bit of time. - await asyncio.sleep(1) + await asyncio.sleep(3) + logging.debug("running repltakover") await c_replica.execute_command(f"REPLTAKEOVER 5 SAVE") + logging.debug("after running repltakover") seeder.stop() assert await c_replica.execute_command("role") == ["master", []] + stop_info = True # Need to wait a bit to give time to write the shutdown snapshot await asyncio.sleep(1) @@ -1258,10 +1261,7 @@ async def seed(): @pytest.mark.parametrize("master_threads, replica_threads", [[4, 4]]) @pytest.mark.asyncio async def test_take_over_read_commands(df_factory, master_threads, replica_threads): - master = df_factory.create( - proactor_threads=master_threads, - logtostderr=True, - ) + master = df_factory.create(proactor_threads=master_threads) replica = df_factory.create(proactor_threads=replica_threads) df_factory.start_all([master, replica])