From ca695a5354a07768fe3929bf17becc192a88bcdb Mon Sep 17 00:00:00 2001 From: Varik Matevosyan Date: Wed, 20 Nov 2024 12:02:22 +0400 Subject: [PATCH] watch logical replication after enabling, to refresh subscription each 20s --- model/lantern/lantern_resource.rb | 1 - prog/lantern/lantern_resource_nexus.rb | 20 ++++++++++++++ .../lantern/lantern_resource_nexus_spec.rb | 26 +++++++++++++++++++ 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/model/lantern/lantern_resource.rb b/model/lantern/lantern_resource.rb index f871ac4f5..1c24ccba2 100644 --- a/model/lantern/lantern_resource.rb +++ b/model/lantern/lantern_resource.rb @@ -159,7 +159,6 @@ def listen_ddl_log EXECUTE format('SET ROLE %I', NEW.session_user); EXECUTE NEW.ddl_command; RESET ROLE; - ALTER SUBSCRIPTION sub_#{ubid} REFRESH PUBLICATION; RETURN NEW; END; $$ LANGUAGE plpgsql; diff --git a/prog/lantern/lantern_resource_nexus.rb b/prog/lantern/lantern_resource_nexus.rb index 73e737c4c..331947407 100644 --- a/prog/lantern/lantern_resource_nexus.rb +++ b/prog/lantern/lantern_resource_nexus.rb @@ -184,9 +184,27 @@ def before_run label def enable_logical_replication lantern_resource.listen_ddl_log lantern_resource.create_and_enable_subscription + bud Prog::Lantern::LanternResourceNexus, {"subscription" => "sub_#{lantern_resource.ubid}"}, "watch_logical_replication" hop_wait end + label def watch_logical_replication + if !lantern_resource.logical_replication + pop "logical replication disabled" + end + + sub = frame["subscription"] + subscription_exists = !lantern_resource.representative_server.run_query("SELECT subname FROM pg_subscription WHERE subname='#{sub}'").empty? + + if !subscription_exists + pop "subscription deleted" + end + + lantern_resource.representative_server.run_query("ALTER SUBSCRIPTION #{sub} REFRESH PUBLICATION") + + nap 20 + end + label def wait # Create missing standbys (lantern_resource.required_standby_count + 1 - lantern_resource.servers.count).times do @@ -232,6 +250,8 @@ def before_run end end + reap + nap 30 end diff --git a/spec/prog/lantern/lantern_resource_nexus_spec.rb b/spec/prog/lantern/lantern_resource_nexus_spec.rb index b70083cd6..0bf10ec38 100644 --- a/spec/prog/lantern/lantern_resource_nexus_spec.rb +++ b/spec/prog/lantern/lantern_resource_nexus_spec.rb @@ -348,10 +348,36 @@ it "enables logical replication" do expect(lantern_resource).to receive(:listen_ddl_log) expect(lantern_resource).to receive(:create_and_enable_subscription) + expect(nx).to receive(:bud).with(described_class, {"subscription" => "sub_#{lantern_resource.ubid}"}, "watch_logical_replication") expect { nx.enable_logical_replication }.to hop("wait") end end + describe "#watch_logical_replication" do + it "pops if not in logical replication" do + expect(lantern_resource).to receive(:logical_replication).and_return(false) + expect { nx.watch_logical_replication }.to exit({"msg" => "logical replication disabled"}) + end + + it "pops if subscription is deleted" do + expect(lantern_resource).to receive(:logical_replication).and_return(true) + representative_server = instance_double(LanternServer) + expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once) + expect(representative_server).to receive(:run_query).and_return("") + expect { nx.watch_logical_replication }.to exit({"msg" => "subscription deleted"}) + end + + it "refreshes subscription" do + expect(lantern_resource).to receive(:logical_replication).and_return(true) + representative_server = instance_double(LanternServer) + expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once) + expect(representative_server).to receive(:run_query).and_return("sub") + expect(nx).to receive("frame").and_return({"subscription" => "test_sub"}) + expect(representative_server).to receive(:run_query).with("ALTER SUBSCRIPTION test_sub REFRESH PUBLICATION") + expect { nx.watch_logical_replication }.to nap 20 + end + end + describe "#swap_leaders_with_parent" do it "swaps ips with parent leader" do parent = instance_double(LanternResource)