From 37834d65de93febc26d76ebee645ab08e62e8c2c Mon Sep 17 00:00:00 2001 From: Xiaoyuan Jin Date: Fri, 27 Sep 2024 20:03:42 +0800 Subject: [PATCH] This is an automated cherry-pick of #56362 Signed-off-by: ti-chi-bot --- pkg/owner/BUILD.bazel | 63 +++++ pkg/owner/manager.go | 553 +++++++++++++++++++++++++++++++++++++ pkg/owner/manager_test.go | 562 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 1178 insertions(+) create mode 100644 pkg/owner/BUILD.bazel create mode 100644 pkg/owner/manager.go create mode 100644 pkg/owner/manager_test.go diff --git a/pkg/owner/BUILD.bazel b/pkg/owner/BUILD.bazel new file mode 100644 index 0000000000000..62dbe260e3127 --- /dev/null +++ b/pkg/owner/BUILD.bazel @@ -0,0 +1,63 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "owner", + srcs = [ + "manager.go", + "mock.go", + ], + importpath = "github.com/pingcap/tidb/pkg/owner", + visibility = ["//visibility:public"], + deps = [ + "//pkg/ddl/util", + "//pkg/kv", + "//pkg/metrics", + "//pkg/parser/terror", + "//pkg/util", + "//pkg/util/logutil", + "//pkg/util/timeutil", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@io_etcd_go_etcd_api_v3//mvccpb", + "@io_etcd_go_etcd_api_v3//v3rpc/rpctypes", + "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_client_v3//concurrency", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "owner_test", + timeout = "short", + srcs = [ + "fail_test.go", + "main_test.go", + "manager_test.go", + ], + embed = [":owner"], + flaky = True, + shard_count = 9, + deps = [ + "//pkg/ddl", + "//pkg/infoschema", + "//pkg/kv", + "//pkg/parser/terror", + "//pkg/store/mockstore", + "//pkg/testkit", + "//pkg/testkit/testsetup", + "//pkg/util", + "//pkg/util/logutil", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_client_v3//concurrency", + "@io_etcd_go_etcd_server_v3//embed", + "@io_etcd_go_etcd_tests_v3//integration", + "@org_golang_google_grpc//:grpc", + "@org_golang_x_exp//rand", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go new file mode 100644 index 0000000000000..ea989b75f4856 --- /dev/null +++ b/pkg/owner/manager.go @@ -0,0 +1,553 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "bytes" + "context" + "fmt" + "os" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/ddl/util" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/parser/terror" + util2 "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" + atomicutil "go.uber.org/atomic" + "go.uber.org/zap" +) + +// Listener is used to listen the ownerManager's owner state. +type Listener interface { + OnBecomeOwner() + OnRetireOwner() +} + +// Manager is used to campaign the owner and manage the owner information. +type Manager interface { + // ID returns the ID of the manager. + ID() string + // IsOwner returns whether the ownerManager is the owner. + IsOwner() bool + // RetireOwner make the manager to be a not owner. It's exported for testing. + RetireOwner() + // GetOwnerID gets the owner ID. + GetOwnerID(ctx context.Context) (string, error) + // SetOwnerOpValue updates the owner op value. + SetOwnerOpValue(ctx context.Context, op OpType) error + // CampaignOwner campaigns the owner. + CampaignOwner(...int) error + // ResignOwner lets the owner start a new election. + ResignOwner(ctx context.Context) error + // Cancel cancels this etcd ownerManager. + Cancel() + // RequireOwner requires the ownerManager is owner. + RequireOwner(ctx context.Context) error + // CampaignCancel cancels one etcd campaign + CampaignCancel() + // SetListener sets the listener, set before CampaignOwner. + SetListener(listener Listener) +} + +const ( + keyOpDefaultTimeout = 5 * time.Second +) + +// OpType is the owner key value operation type. +type OpType byte + +// List operation of types. +const ( + OpNone OpType = 0 + OpSyncUpgradingState OpType = 1 +) + +// String implements fmt.Stringer interface. +func (ot OpType) String() string { + switch ot { + case OpSyncUpgradingState: + return "sync upgrading state" + default: + return "none" + } +} + +// IsSyncedUpgradingState represents whether the upgrading state is synchronized. +func (ot OpType) IsSyncedUpgradingState() bool { + return ot == OpSyncUpgradingState +} + +// DDLOwnerChecker is used to check whether tidb is owner. +type DDLOwnerChecker interface { + // IsOwner returns whether the ownerManager is the owner. + IsOwner() bool +} + +// ownerManager represents the structure which is used for electing owner. +type ownerManager struct { + id string // id is the ID of the manager. + key string + ctx context.Context + prompt string + logPrefix string + logCtx context.Context + etcdCli *clientv3.Client + cancel context.CancelFunc + elec atomic.Pointer[concurrency.Election] + sessionLease *atomicutil.Int64 + wg sync.WaitGroup + campaignCancel context.CancelFunc + + listener Listener +} + +// NewOwnerManager creates a new Manager. +func NewOwnerManager(ctx context.Context, etcdCli *clientv3.Client, prompt, id, key string) Manager { + logPrefix := fmt.Sprintf("[%s] %s ownerManager %s", prompt, key, id) + ctx, cancelFunc := context.WithCancel(ctx) + return &ownerManager{ + etcdCli: etcdCli, + id: id, + key: key, + ctx: ctx, + prompt: prompt, + cancel: cancelFunc, + logPrefix: logPrefix, + logCtx: logutil.WithKeyValue(context.Background(), "owner info", logPrefix), + sessionLease: atomicutil.NewInt64(0), + } +} + +// ID implements Manager.ID interface. +func (m *ownerManager) ID() string { + return m.id +} + +// IsOwner implements Manager.IsOwner interface. +func (m *ownerManager) IsOwner() bool { + return m.elec.Load() != nil +} + +// Cancel implements Manager.Cancel interface. +func (m *ownerManager) Cancel() { + m.cancel() + m.wg.Wait() +} + +// RequireOwner implements Manager.RequireOwner interface. +func (*ownerManager) RequireOwner(_ context.Context) error { + return nil +} + +func (m *ownerManager) SetListener(listener Listener) { + m.listener = listener +} + +// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing. +var ManagerSessionTTL = 60 + +// setManagerSessionTTL sets the ManagerSessionTTL value, it's used for testing. +func setManagerSessionTTL() error { + ttlStr := os.Getenv("tidb_manager_ttl") + if ttlStr == "" { + return nil + } + ttl, err := strconv.Atoi(ttlStr) + if err != nil { + return errors.Trace(err) + } + ManagerSessionTTL = ttl + return nil +} + +// CampaignOwner implements Manager.CampaignOwner interface. +func (m *ownerManager) CampaignOwner(withTTL ...int) error { + ttl := ManagerSessionTTL + if len(withTTL) == 1 { + ttl = withTTL[0] + } + logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) + logutil.BgLogger().Info("start campaign owner", zap.String("ownerInfo", logPrefix)) + session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ttl) + if err != nil { + return errors.Trace(err) + } + m.sessionLease.Store(int64(session.Lease())) + m.wg.Add(1) + var campaignContext context.Context + campaignContext, m.campaignCancel = context.WithCancel(m.ctx) + go m.campaignLoop(campaignContext, session) + return nil +} + +// ResignOwner lets the owner start a new election. +func (m *ownerManager) ResignOwner(ctx context.Context) error { + elec := m.elec.Load() + if elec == nil { + return errors.Errorf("This node is not a owner, can't be resigned") + } + + childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout) + err := elec.Resign(childCtx) + cancel() + if err != nil { + return errors.Trace(err) + } + + logutil.Logger(m.logCtx).Warn("resign owner success") + return nil +} + +func (m *ownerManager) toBeOwner(elec *concurrency.Election) { + m.elec.Store(elec) + logutil.Logger(m.logCtx).Info("become owner") + if m.listener != nil { + m.listener.OnBecomeOwner() + } +} + +// RetireOwner make the manager to be a not owner. +func (m *ownerManager) RetireOwner() { + m.elec.Store(nil) + logutil.Logger(m.logCtx).Info("retire owner") + if m.listener != nil { + m.listener.OnRetireOwner() + } +} + +// CampaignCancel implements Manager.CampaignCancel interface. +func (m *ownerManager) CampaignCancel() { + m.campaignCancel() + m.wg.Wait() +} + +func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession *concurrency.Session) { + defer func() { + m.campaignCancel() + if r := recover(); r != nil { + logutil.BgLogger().Error("recover panic", zap.String("prompt", m.prompt), zap.Any("error", r), zap.Stack("buffer")) + metrics.PanicCounter.WithLabelValues(metrics.LabelDDLOwner).Inc() + } + m.wg.Done() + }() + + logPrefix := m.logPrefix + logCtx := m.logCtx + var err error + for { + if err != nil { + metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, err.Error()).Inc() + } + + select { + case <-etcdSession.Done(): + logutil.Logger(logCtx).Info("etcd session is done, creates a new one") + leaseID := etcdSession.Lease() + etcdSession, err = util2.NewSession(campaignContext, logPrefix, m.etcdCli, util2.NewSessionRetryUnlimited, ManagerSessionTTL) + if err != nil { + logutil.Logger(logCtx).Info("break campaign loop, NewSession failed", zap.Error(err)) + m.revokeSession(logPrefix, leaseID) + return + } + m.sessionLease.Store(int64(etcdSession.Lease())) + case <-campaignContext.Done(): + failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) { + if v.(string) == "delOwnerKeyAndNotOwner" { + logutil.Logger(logCtx).Info("mock break campaign and don't clear related info") + return + } + }) + logutil.Logger(logCtx).Info("break campaign loop, context is done") + m.revokeSession(logPrefix, etcdSession.Lease()) + return + default: + } + // If the etcd server turns clocks forward,the following case may occur. + // The etcd server deletes this session's lease ID, but etcd session doesn't find it. + // In this time if we do the campaign operation, the etcd server will return ErrLeaseNotFound. + if terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) { + if etcdSession != nil { + err = etcdSession.Close() + logutil.Logger(logCtx).Info("etcd session encounters the error of lease not found, closes it", zap.Error(err)) + } + continue + } + + elec := concurrency.NewElection(etcdSession, m.key) + err = elec.Campaign(campaignContext, m.id) + if err != nil { + logutil.Logger(logCtx).Info("failed to campaign", zap.Error(err)) + continue + } + + ownerKey, currRev, err := GetOwnerKeyInfo(campaignContext, logCtx, m.etcdCli, m.key, m.id) + if err != nil { + continue + } + + m.toBeOwner(elec) + err = m.watchOwner(campaignContext, etcdSession, ownerKey, currRev) + logutil.Logger(logCtx).Info("watch owner finished", zap.Error(err)) + m.RetireOwner() + + metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc() + logutil.Logger(logCtx).Warn("is not the owner") + } +} + +func (m *ownerManager) revokeSession(_ string, leaseID clientv3.LeaseID) { + // Revoke the session lease. + // If revoke takes longer than the ttl, lease is expired anyway. + cancelCtx, cancel := context.WithTimeout(context.Background(), + time.Duration(ManagerSessionTTL)*time.Second) + _, err := m.etcdCli.Revoke(cancelCtx, leaseID) + cancel() + logutil.Logger(m.logCtx).Info("revoke session", zap.Error(err)) +} + +// GetOwnerID implements Manager.GetOwnerID interface. +func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) { + _, ownerID, _, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key) + return string(ownerID), errors.Trace(err) +} + +func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, int64, error) { + var op OpType + var resp *clientv3.GetResponse + var err error + for i := 0; i < 3; i++ { + if err = ctx.Err(); err != nil { + return "", nil, op, 0, 0, errors.Trace(err) + } + + childCtx, cancel := context.WithTimeout(ctx, util.KeyOpDefaultTimeout) + resp, err = etcdCli.Get(childCtx, ownerPath, clientv3.WithFirstCreate()...) + cancel() + if err == nil { + break + } + logutil.Logger(logCtx).Info("etcd-cli get owner info failed", zap.String("key", ownerPath), zap.Int("retryCnt", i), zap.Error(err)) + time.Sleep(util.KeyOpRetryInterval) + } + if err != nil { + logutil.Logger(logCtx).Warn("etcd-cli get owner info failed", zap.Error(err)) + return "", nil, op, 0, 0, errors.Trace(err) + } + if len(resp.Kvs) == 0 { + return "", nil, op, 0, 0, concurrency.ErrElectionNoLeader + } + + var ownerID []byte + ownerID, op = splitOwnerValues(resp.Kvs[0].Value) + logutil.Logger(logCtx).Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key), + zap.ByteString("ownerID", ownerID), zap.Stringer("op", op)) + return string(resp.Kvs[0].Key), ownerID, op, resp.Header.Revision, resp.Kvs[0].ModRevision, nil +} + +// GetOwnerKeyInfo gets the owner key and current revision. +func GetOwnerKeyInfo( + ctx, logCtx context.Context, + etcdCli *clientv3.Client, + etcdKey, id string, +) (string, int64, error) { + ownerKey, ownerID, _, currRevision, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey) + if err != nil { + return "", 0, errors.Trace(err) + } + if string(ownerID) != id { + logutil.Logger(logCtx).Warn("is not the owner") + return "", 0, errors.New("ownerInfoNotMatch") + } + + return ownerKey, currRevision, nil +} + +func splitOwnerValues(val []byte) ([]byte, OpType) { + vals := bytes.Split(val, []byte("_")) + var op OpType + if len(vals) == 2 { + op = OpType(vals[1][0]) + } + return vals[0], op +} + +func joinOwnerValues(vals ...[]byte) []byte { + return bytes.Join(vals, []byte("_")) +} + +// SetOwnerOpValue implements Manager.SetOwnerOpValue interface. +func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error { + // owner don't change. + ownerKey, ownerID, currOp, _, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key) + if err != nil { + return errors.Trace(err) + } + if currOp == op { + logutil.Logger(m.logCtx).Info("set owner op is the same as the original, so do nothing.", zap.Stringer("op", op)) + return nil + } + if string(ownerID) != m.id { + return errors.New("ownerInfoNotMatch") + } + newOwnerVal := joinOwnerValues(ownerID, []byte{byte(op)}) + + failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) { + if valStr, ok := v.(string); ok { + if err := mockDelOwnerKey(valStr, ownerKey, m); err != nil { + failpoint.Return(err) + } + } + }) + + leaseOp := clientv3.WithLease(clientv3.LeaseID(m.sessionLease.Load())) + resp, err := m.etcdCli.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision(ownerKey), "=", modRevision)). + Then(clientv3.OpPut(ownerKey, string(newOwnerVal), leaseOp)). + Commit() + if err == nil && !resp.Succeeded { + err = errors.New("put owner key failed, cmp is false") + } + logutil.BgLogger().Info("set owner op value", zap.String("owner key", ownerKey), zap.ByteString("ownerID", ownerID), + zap.Stringer("old Op", currOp), zap.Stringer("op", op), zap.Error(err)) + metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.PutValue+"_"+metrics.RetLabel(err)).Inc() + return errors.Trace(err) +} + +// GetOwnerOpValue gets the owner op value. +func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath, logPrefix string) (OpType, error) { + // It's using for testing. + if etcdCli == nil { + return *mockOwnerOpValue.Load(), nil + } + + logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) + _, _, op, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath) + return op, errors.Trace(err) +} + +// WatchOwnerForTest watches the ownerKey. +// This function is used to test watchOwner(). +func WatchOwnerForTest(ctx context.Context, m Manager, etcdSession *concurrency.Session, key string, createRevison int64) error { + if ownerManager, ok := m.(*ownerManager); ok { + return ownerManager.watchOwner(ctx, etcdSession, key, createRevison) + } + return nil +} + +func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string, currRev int64) error { + logPrefix := fmt.Sprintf("[%s] ownerManager %s watch owner key %v", m.prompt, m.id, key) + logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) + logutil.BgLogger().Debug(logPrefix) + // we need to watch the ownerKey since currRev + 1. + watchCh := m.etcdCli.Watch(ctx, key, clientv3.WithRev(currRev+1)) + for { + select { + case resp, ok := <-watchCh: + if !ok { + metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.WatcherClosed).Inc() + logutil.Logger(logCtx).Info("watcher is closed, no owner") + return errors.Errorf("watcher is closed, key: %v", key) + } + if resp.Canceled { + metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Cancelled).Inc() + logutil.Logger(logCtx).Info("watch canceled, no owner") + return errors.Errorf("watch canceled, key: %v", key) + } + + for _, ev := range resp.Events { + if ev.Type == mvccpb.DELETE { + metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Deleted).Inc() + logutil.Logger(logCtx).Info("watch failed, owner is deleted") + return nil + } + } + case <-etcdSession.Done(): + metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.SessionDone).Inc() + return nil + case <-ctx.Done(): + metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.CtxDone).Inc() + return nil + } + } +} + +func init() { + err := setManagerSessionTTL() + if err != nil { + logutil.BgLogger().Warn("set manager session TTL failed", zap.Error(err)) + } +} + +// DeleteLeader deletes the leader key. +func DeleteLeader(ctx context.Context, cli *clientv3.Client, key string) error { + ownerKey, _, _, _, _, err := getOwnerInfo(ctx, ctx, cli, key) + if err != nil { + return errors.Trace(err) + } + _, err = cli.Delete(ctx, ownerKey) + return err +} + +// AcquireDistributedLock creates a mutex with ETCD client, and returns a mutex release function. +func AcquireDistributedLock( + ctx context.Context, + cli *clientv3.Client, + key string, + ttlInSec int, +) (release func(), err error) { + se, err := concurrency.NewSession(cli, concurrency.WithTTL(ttlInSec)) + if err != nil { + return nil, err + } + mu := concurrency.NewMutex(se, key) + maxRetryCnt := 10 + err = util2.RunWithRetry(maxRetryCnt, util2.RetryInterval, func() (bool, error) { + err = mu.Lock(ctx) + if err != nil { + return true, err + } + return false, nil + }) + if err != nil { + err1 := se.Close() + if err1 != nil { + logutil.Logger(ctx).Warn("close session error", zap.Error(err1)) + } + return nil, err + } + logutil.Logger(ctx).Info("acquire distributed flush lock success", zap.String("key", key)) + return func() { + err = mu.Unlock(ctx) + if err != nil { + logutil.Logger(ctx).Warn("release distributed flush lock error", zap.Error(err), zap.String("key", key)) + } else { + logutil.Logger(ctx).Info("release distributed flush lock success", zap.String("key", key)) + } + err = se.Close() + if err != nil { + logutil.Logger(ctx).Warn("close session error", zap.Error(err)) + } + }, nil +} diff --git a/pkg/owner/manager_test.go b/pkg/owner/manager_test.go new file mode 100644 index 0000000000000..907ebe79de896 --- /dev/null +++ b/pkg/owner/manager_test.go @@ -0,0 +1,562 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner_test + +import ( + "context" + "fmt" + "net/url" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + . "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/owner" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/store/mockstore" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" + "go.etcd.io/etcd/server/v3/embed" + "go.etcd.io/etcd/tests/v3/integration" + "golang.org/x/exp/rand" +) + +const testLease = 5 * time.Millisecond + +type testInfo struct { + store kv.Storage + cluster *integration.ClusterV3 + client *clientv3.Client + ddl DDL +} + +func newTestInfo(t *testing.T) *testInfo { + store, err := mockstore.NewMockStore() + require.NoError(t, err) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 4}) + + cli := cluster.Client(0) + ic := infoschema.NewCache(nil, 2) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) + d, _ := NewDDL( + context.Background(), + WithEtcdClient(cli), + WithStore(store), + WithLease(testLease), + WithInfoCache(ic), + ) + + return &testInfo{ + store: store, + cluster: cluster, + client: cli, + ddl: d, + } +} + +func (ti *testInfo) Close(t *testing.T) { + err := ti.ddl.Stop() + require.NoError(t, err) + err = ti.store.Close() + require.NoError(t, err) + ti.cluster.Terminate(t) +} + +type listener struct { + val atomic.Bool +} + +func (l *listener) OnBecomeOwner() { + l.val.Store(true) +} +func (l *listener) OnRetireOwner() { + l.val.Store(false) +} + +func TestSingle(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + tInfo := newTestInfo(t) + client, d := tInfo.client, tInfo.ddl + defer tInfo.Close(t) + ownerManager := d.OwnerManager() + lis := &listener{} + ownerManager.SetListener(lis) + require.NoError(t, ownerManager.CampaignOwner()) + isOwner := checkOwner(d, true) + require.True(t, isOwner) + require.True(t, lis.val.Load()) + + // test for newSession failed + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + manager := owner.NewOwnerManager(ctx, client, "ddl", "ddl_id", DDLOwnerKey) + cancel() + + err := manager.CampaignOwner() + comment := fmt.Sprintf("campaigned result don't match, err %v", err) + require.True(t, terror.ErrorEqual(err, context.Canceled) || terror.ErrorEqual(err, context.DeadlineExceeded), comment) + + isOwner = checkOwner(d, true) + require.True(t, isOwner) + + // The test is used to exit campaign loop. + ownerManager.Cancel() + isOwner = checkOwner(d, false) + require.False(t, isOwner) + require.False(t, lis.val.Load()) + + time.Sleep(200 * time.Millisecond) + + // err is ok to be not nil since we canceled the manager. + ownerID, _ := manager.GetOwnerID(ctx) + require.Equal(t, "", ownerID) + op, _ := owner.GetOwnerOpValue(ctx, client, DDLOwnerKey, "log prefix") + require.Equal(t, op, owner.OpNone) +} + +func TestSetAndGetOwnerOpValue(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + tInfo := newTestInfo(t) + defer tInfo.Close(t) + + require.NoError(t, tInfo.ddl.OwnerManager().CampaignOwner()) + isOwner := checkOwner(tInfo.ddl, true) + require.True(t, isOwner) + + // test set/get owner info + manager := tInfo.ddl.OwnerManager() + ownerID, err := manager.GetOwnerID(context.Background()) + require.NoError(t, err) + require.Equal(t, tInfo.ddl.GetID(), ownerID) + op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.NoError(t, err) + require.Equal(t, op, owner.OpNone) + require.False(t, op.IsSyncedUpgradingState()) + err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) + require.NoError(t, err) + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.NoError(t, err) + require.Equal(t, op, owner.OpSyncUpgradingState) + require.True(t, op.IsSyncedUpgradingState()) + // update the same as the original value + err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) + require.NoError(t, err) + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.NoError(t, err) + require.Equal(t, op, owner.OpSyncUpgradingState) + require.True(t, op.IsSyncedUpgradingState()) + // test del owner key when SetOwnerOpValue + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey", `return("delOwnerKeyAndNotOwner")`)) + err = manager.SetOwnerOpValue(context.Background(), owner.OpNone) + require.Error(t, err, "put owner key failed, cmp is false") + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.NotNil(t, err) + require.Equal(t, concurrency.ErrElectionNoLeader.Error(), err.Error()) + require.Equal(t, op, owner.OpNone) + require.False(t, op.IsSyncedUpgradingState()) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey")) + + // Let ddl run for the owner again. + require.NoError(t, tInfo.ddl.OwnerManager().CampaignOwner()) + isOwner = checkOwner(tInfo.ddl, true) + require.True(t, isOwner) + // Mock the manager become not owner because the owner is deleted(like TTL is timeout). + // And then the manager campaigns the owner again, and become the owner. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey", `return("onlyDelOwnerKey")`)) + err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) + require.Error(t, err, "put owner key failed, cmp is false") + isOwner = checkOwner(tInfo.ddl, true) + require.True(t, isOwner) + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.NoError(t, err) + require.Equal(t, op, owner.OpNone) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey")) +} + +// TestGetOwnerOpValueBeforeSet tests get owner opValue before set this value when the etcdClient is nil. +func TestGetOwnerOpValueBeforeSet(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockNotSetOwnerOp", `return(true)`)) + + _, dom := testkit.CreateMockStoreAndDomain(t) + ddl := dom.DDL() + require.NoError(t, ddl.OwnerManager().CampaignOwner()) + isOwner := checkOwner(ddl, true) + require.True(t, isOwner) + + // test set/get owner info + manager := ddl.OwnerManager() + ownerID, err := manager.GetOwnerID(context.Background()) + require.NoError(t, err) + require.Equal(t, ddl.GetID(), ownerID) + op, err := owner.GetOwnerOpValue(context.Background(), nil, DDLOwnerKey, "log prefix") + require.NoError(t, err) + require.Equal(t, op, owner.OpNone) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockNotSetOwnerOp")) + err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) + require.NoError(t, err) + op, err = owner.GetOwnerOpValue(context.Background(), nil, DDLOwnerKey, "log prefix") + require.NoError(t, err) + require.Equal(t, op, owner.OpSyncUpgradingState) +} + +func TestCluster(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + originalTTL := owner.ManagerSessionTTL + owner.ManagerSessionTTL = 3 + defer func() { + owner.ManagerSessionTTL = originalTTL + }() + + tInfo := newTestInfo(t) + store, cluster, d := tInfo.store, tInfo.cluster, tInfo.ddl + defer tInfo.Close(t) + require.NoError(t, d.OwnerManager().CampaignOwner()) + + isOwner := checkOwner(d, true) + require.True(t, isOwner) + + cli1 := cluster.Client(1) + ic2 := infoschema.NewCache(nil, 2) + ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) + d1, _ := NewDDL( + context.Background(), + WithEtcdClient(cli1), + WithStore(store), + WithLease(testLease), + WithInfoCache(ic2), + ) + require.NoError(t, d1.OwnerManager().CampaignOwner()) + + isOwner = checkOwner(d1, false) + require.False(t, isOwner) + + // Delete the leader key, the d1 become the owner. + cliRW := cluster.Client(2) + err := deleteLeader(cliRW, DDLOwnerKey) + require.NoError(t, err) + + isOwner = checkOwner(d, false) + require.False(t, isOwner) + + d.OwnerManager().Cancel() + // d3 (not owner) stop + cli3 := cluster.Client(3) + ic3 := infoschema.NewCache(nil, 2) + ic3.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) + d3, _ := NewDDL( + context.Background(), + WithEtcdClient(cli3), + WithStore(store), + WithLease(testLease), + WithInfoCache(ic3), + ) + require.NoError(t, d3.OwnerManager().CampaignOwner()) + + isOwner = checkOwner(d3, false) + require.False(t, isOwner) + + d3.OwnerManager().Cancel() + // Cancel the owner context, there is no owner. + d1.OwnerManager().Cancel() + + logPrefix := fmt.Sprintf("[ddl] %s ownerManager %s", DDLOwnerKey, "useless id") + logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) + _, _, err = owner.GetOwnerKeyInfo(context.Background(), logCtx, cliRW, DDLOwnerKey, "useless id") + require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err) + op, err := owner.GetOwnerOpValue(context.Background(), cliRW, DDLOwnerKey, logPrefix) + require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err) + require.Equal(t, op, owner.OpNone) +} + +func TestWatchOwner(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + tInfo := newTestInfo(t) + client, d := tInfo.client, tInfo.ddl + defer tInfo.Close(t) + ownerManager := d.OwnerManager() + lis := &listener{} + ownerManager.SetListener(lis) + require.NoError(t, ownerManager.CampaignOwner()) + isOwner := checkOwner(d, true) + require.True(t, isOwner) + + // get the owner id. + ctx := context.Background() + id, err := ownerManager.GetOwnerID(ctx) + require.NoError(t, err) + + // create etcd session. + session, err := concurrency.NewSession(client) + require.NoError(t, err) + + // test the GetOwnerKeyInfo() + ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, DDLOwnerKey, id) + require.NoError(t, err) + + // watch the ownerKey. + ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*300) + defer cancel2() + watchDone := make(chan bool) + watched := false + go func() { + watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision) + require.NoError(t, watchErr) + watchDone <- true + }() + + select { + case watched = <-watchDone: + case <-ctx2.Done(): + } + require.False(t, watched) + + // delete the owner, and can watch the DELETE event. + err = deleteLeader(client, DDLOwnerKey) + require.NoError(t, err) + watched = <-watchDone + require.True(t, watched) + + // the ownerKey has been deleted, watch ownerKey again, it can be watched. + go func() { + watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision) + require.NoError(t, watchErr) + watchDone <- true + }() + + watched = <-watchDone + require.True(t, watched) +} + +func TestWatchOwnerAfterDeleteOwnerKey(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + tInfo := newTestInfo(t) + client, d := tInfo.client, tInfo.ddl + defer tInfo.Close(t) + ownerManager := d.OwnerManager() + lis := &listener{} + ownerManager.SetListener(lis) + require.NoError(t, ownerManager.CampaignOwner()) + isOwner := checkOwner(d, true) + require.True(t, isOwner) + + // get the owner id. + ctx := context.Background() + id, err := ownerManager.GetOwnerID(ctx) + require.NoError(t, err) + session, err := concurrency.NewSession(client) + require.NoError(t, err) + + // get the ownkey informations. + ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, DDLOwnerKey, id) + require.NoError(t, err) + + // delete the ownerkey + err = deleteLeader(client, DDLOwnerKey) + require.NoError(t, err) + + // watch the ownerKey with the current revisoin. + watchDone := make(chan bool) + go func() { + watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision) + require.NoError(t, watchErr) + watchDone <- true + }() + <-watchDone +} + +func checkOwner(d DDL, fbVal bool) (isOwner bool) { + manager := d.OwnerManager() + // The longest to wait for 30 seconds to + // make sure that campaigning owners is completed. + for i := 0; i < 6000; i++ { + time.Sleep(5 * time.Millisecond) + isOwner = manager.IsOwner() + if isOwner == fbVal { + break + } + } + return +} + +func deleteLeader(cli *clientv3.Client, prefixKey string) error { + session, err := concurrency.NewSession(cli) + if err != nil { + return errors.Trace(err) + } + defer func() { + _ = session.Close() + }() + election := concurrency.NewElection(session, prefixKey) + resp, err := election.Leader(context.Background()) + if err != nil { + return errors.Trace(err) + } + _, err = cli.Delete(context.Background(), string(resp.Kvs[0].Key)) + return errors.Trace(err) +} + +func TestImmediatelyCancel(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + tInfo := newTestInfo(t) + d := tInfo.ddl + defer tInfo.Close(t) + ownerManager := d.OwnerManager() + for i := 0; i < 10; i++ { + err := ownerManager.CampaignOwner() + require.NoError(t, err) + ownerManager.CampaignCancel() + } +} + +func TestAcquireDistributedLock(t *testing.T) { + const addrFmt = "http://127.0.0.1:%d" + cfg := embed.NewConfig() + cfg.Dir = t.TempDir() + // rand port in [20000, 60000) + randPort := int(rand.Int31n(40000)) + 20000 + clientAddr := fmt.Sprintf(addrFmt, randPort) + lcurl, _ := url.Parse(clientAddr) + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{*lcurl}, []url.URL{*lcurl} + lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1)) + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{*lpurl}, []url.URL{*lpurl} + cfg.InitialCluster = "default=" + lpurl.String() + cfg.Logger = "zap" + embedEtcd, err := embed.StartEtcd(cfg) + require.NoError(t, err) + <-embedEtcd.Server.ReadyNotify() + t.Cleanup(func() { + embedEtcd.Close() + }) + makeEtcdCli := func(t *testing.T) (cli *clientv3.Client) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{lcurl.String()}, + }) + require.NoError(t, err) + t.Cleanup(func() { + cli.Close() + }) + return cli + } + t.Run("acquire distributed lock with same client", func(t *testing.T) { + cli := makeEtcdCli(t) + getLock := make(chan struct{}) + ctx := context.Background() + + release1, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10) + require.NoError(t, err) + var wg util.WaitGroupWrapper + wg.Run(func() { + // Acquire another distributed lock will be blocked. + release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10) + require.NoError(t, err) + getLock <- struct{}{} + release2() + }) + timer := time.NewTimer(300 * time.Millisecond) + select { + case <-getLock: + require.FailNow(t, "acquired same lock unexpectedly") + case <-timer.C: + release1() + <-getLock + } + wg.Wait() + + release1, err = owner.AcquireDistributedLock(ctx, cli, "test-lock/1", 10) + require.NoError(t, err) + release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock/2", 10) + require.NoError(t, err) + release1() + release2() + }) + + t.Run("acquire distributed lock with different clients", func(t *testing.T) { + cli1 := makeEtcdCli(t) + cli2 := makeEtcdCli(t) + + getLock := make(chan struct{}) + ctx := context.Background() + + release1, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 10) + require.NoError(t, err) + var wg util.WaitGroupWrapper + wg.Run(func() { + // Acquire another distributed lock will be blocked. + release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10) + require.NoError(t, err) + getLock <- struct{}{} + release2() + }) + timer := time.NewTimer(300 * time.Millisecond) + select { + case <-getLock: + require.FailNow(t, "acquired same lock unexpectedly") + case <-timer.C: + release1() + <-getLock + } + wg.Wait() + }) + + t.Run("acquire distributed lock until timeout", func(t *testing.T) { + cli1 := makeEtcdCli(t) + cli2 := makeEtcdCli(t) + ctx := context.Background() + + _, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 1) + require.NoError(t, err) + cli1.Close() // Note that release() is not invoked. + + release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10) + require.NoError(t, err) + release2() + }) +}