From 2b1fafa575cb29e176b34a07767e82d55273cf79 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 8 Dec 2023 18:27:32 +0800 Subject: [PATCH] plugin: fix bug that watch loop will refresh frequently when channel closed (#49275) (#49289) close pingcap/tidb#49273 --- plugin/BUILD.bazel | 3 ++- plugin/plugin.go | 28 ++++++++++++++++++++++++---- plugin/plugin_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) diff --git a/plugin/BUILD.bazel b/plugin/BUILD.bazel index 69c18887fc4cb..4d9f0d880c611 100644 --- a/plugin/BUILD.bazel +++ b/plugin/BUILD.bazel @@ -38,7 +38,7 @@ go_test( ], embed = [":plugin"], flaky = True, - shard_count = 10, + shard_count = 11, deps = [ "//kv", "//parser/mysql", @@ -48,6 +48,7 @@ go_test( "//testkit", "//testkit/testsetup", "@com_github_stretchr_testify//require", + "@io_etcd_go_etcd_client_v3//:client", "@org_uber_go_goleak//:goleak", ], ) diff --git a/plugin/plugin.go b/plugin/plugin.go index 249e6bae25b17..204c890509e4d 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -20,6 +20,7 @@ import ( gplugin "plugin" "strconv" "sync/atomic" + "time" "unsafe" "github.com/pingcap/errors" @@ -273,14 +274,33 @@ func (w *flushWatcher) refreshPluginState() error { } return nil } - func (w *flushWatcher) watchLoop() { - watchChan := w.etcd.Watch(w.ctx, w.path) + const reWatchInterval = time.Second * 5 + logutil.BgLogger().Info("plugin flushWatcher loop started", zap.String("plugin", w.manifest.Name)) + for w.ctx.Err() == nil { + ch := w.etcd.Watch(w.ctx, w.path) + if exit := w.watchLoopWithChan(ch); exit { + break + } + + logutil.BgLogger().Info( + "plugin flushWatcher old chan closed, restart loop later", + zap.String("plugin", w.manifest.Name), + zap.Duration("after", reWatchInterval)) + time.Sleep(reWatchInterval) + } +} + +func (w *flushWatcher) watchLoopWithChan(ch clientv3.WatchChan) (exit bool) { for { select { case <-w.ctx.Done(): - return - case <-watchChan: + return true + case _, ok := <-ch: + if !ok { + return false + } + logutil.BgLogger().Info("plugin flushWatcher detected event to reload plugin config", zap.String("plugin", w.manifest.Name)) _ = w.refreshPluginState() } } diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go index cfcc85ef310f3..088da35e59bfa 100644 --- a/plugin/plugin_test.go +++ b/plugin/plugin_test.go @@ -18,10 +18,13 @@ import ( "context" "io" "strconv" + "sync/atomic" "testing" + "time" "github.com/pingcap/tidb/sessionctx/variable" "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" ) func TestLoadPluginSuccess(t *testing.T) { @@ -242,3 +245,43 @@ func TestPluginsClone(t *testing.T) { require.Equal(t, uint16(1), cps.versions["whitelist"]) require.Len(t, cps.dyingPlugins, 1) } + +func TestPluginWatcherLoop(t *testing.T) { + // exit when ctx done + ctx, cancel := context.WithCancel(context.Background()) + watcher := &flushWatcher{ + ctx: ctx, + manifest: &Manifest{ + Name: "test", + }, + } + ch := make(chan clientv3.WatchResponse) + var cancelled atomic.Bool + go func() { + time.Sleep(10 * time.Millisecond) + cancelled.Store(true) + cancel() + }() + exit := watcher.watchLoopWithChan(ch) + require.True(t, exit) + require.True(t, cancelled.Load()) + + // exit when ch closed + watcher = &flushWatcher{ + ctx: context.Background(), + manifest: &Manifest{ + Name: "test", + }, + } + + var closed atomic.Bool + ch = make(chan clientv3.WatchResponse) + go func() { + time.Sleep(10 * time.Millisecond) + closed.Store(true) + close(ch) + }() + exit = watcher.watchLoopWithChan(ch) + require.False(t, exit) + require.True(t, cancelled.Load()) +}