From 8752a714716b7009d5a5da7e1192a62f4756a102 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 10 Feb 2025 11:15:43 +0800 Subject: [PATCH] resolve conflict --- owner/BUILD.bazel | 4 - owner/manager.go | 8 +- owner/manager_test.go | 34 +++ pkg/owner/manager.go | 553 ------------------------------------- pkg/owner/manager_test.go | 562 -------------------------------------- 5 files changed, 38 insertions(+), 1123 deletions(-) delete mode 100644 pkg/owner/manager.go delete mode 100644 pkg/owner/manager_test.go diff --git a/owner/BUILD.bazel b/owner/BUILD.bazel index 6636e65e9a9a2..0a3024113369a 100644 --- a/owner/BUILD.bazel +++ b/owner/BUILD.bazel @@ -32,10 +32,6 @@ go_test( ], embed = [":owner"], flaky = True, -<<<<<<< HEAD:owner/BUILD.bazel -======= - shard_count = 9, ->>>>>>> afdd5c2ecd5 (owner: fix data race on ownerManager.campaignCancel (#56362)):pkg/owner/BUILD.bazel deps = [ "//ddl", "//infoschema", diff --git a/owner/manager.go b/owner/manager.go index 4223a433b8b55..260b9df8655bd 100644 --- a/owner/manager.go +++ b/owner/manager.go @@ -154,7 +154,9 @@ func (m *ownerManager) CampaignOwner() error { return errors.Trace(err) } m.wg.Add(1) - go m.campaignLoop(session) + var campaignContext context.Context + campaignContext, m.campaignCancel = context.WithCancel(m.ctx) + go m.campaignLoop(campaignContext, session) return nil } @@ -194,9 +196,7 @@ func (m *ownerManager) CampaignCancel() { m.wg.Wait() } -func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) { - var campaignContext context.Context - campaignContext, m.campaignCancel = context.WithCancel(m.ctx) +func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession *concurrency.Session) { defer func() { m.campaignCancel() if r := recover(); r != nil { diff --git a/owner/manager_test.go b/owner/manager_test.go index fecad76341852..1488295e7f73d 100644 --- a/owner/manager_test.go +++ b/owner/manager_test.go @@ -36,6 +36,23 @@ import ( const testLease = 5 * time.Millisecond +type testInfo struct { + cluster *integration.ClusterV3 + client *clientv3.Client +} + +func newTestInfo(t *testing.T) *testInfo { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + return &testInfo{ + cluster: cluster, + client: cluster.Client(0), + } +} + +func (ti *testInfo) Close(t *testing.T) { + ti.cluster.Terminate(t) +} + func TestSingle(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") @@ -213,3 +230,20 @@ func deleteLeader(cli *clientv3.Client, prefixKey string) error { _, err = cli.Delete(context.Background(), string(resp.Kvs[0].Key)) return errors.Trace(err) } + +func TestImmediatelyCancel(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + tInfo := newTestInfo(t) + defer tInfo.Close(t) + ownerMgr := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "1", "/owner/key") + defer ownerMgr.Cancel() + for i := 0; i < 10; i++ { + err := ownerMgr.CampaignOwner() + require.NoError(t, err) + ownerMgr.CampaignCancel() + } +} diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go deleted file mode 100644 index ea989b75f4856..0000000000000 --- a/pkg/owner/manager.go +++ /dev/null @@ -1,553 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package owner - -import ( - "bytes" - "context" - "fmt" - "os" - "strconv" - "sync" - "sync/atomic" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/pkg/ddl/util" - "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/parser/terror" - util2 "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/logutil" - "go.etcd.io/etcd/api/v3/mvccpb" - "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" - clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/concurrency" - atomicutil "go.uber.org/atomic" - "go.uber.org/zap" -) - -// Listener is used to listen the ownerManager's owner state. -type Listener interface { - OnBecomeOwner() - OnRetireOwner() -} - -// Manager is used to campaign the owner and manage the owner information. -type Manager interface { - // ID returns the ID of the manager. - ID() string - // IsOwner returns whether the ownerManager is the owner. - IsOwner() bool - // RetireOwner make the manager to be a not owner. It's exported for testing. - RetireOwner() - // GetOwnerID gets the owner ID. - GetOwnerID(ctx context.Context) (string, error) - // SetOwnerOpValue updates the owner op value. - SetOwnerOpValue(ctx context.Context, op OpType) error - // CampaignOwner campaigns the owner. - CampaignOwner(...int) error - // ResignOwner lets the owner start a new election. - ResignOwner(ctx context.Context) error - // Cancel cancels this etcd ownerManager. - Cancel() - // RequireOwner requires the ownerManager is owner. - RequireOwner(ctx context.Context) error - // CampaignCancel cancels one etcd campaign - CampaignCancel() - // SetListener sets the listener, set before CampaignOwner. - SetListener(listener Listener) -} - -const ( - keyOpDefaultTimeout = 5 * time.Second -) - -// OpType is the owner key value operation type. -type OpType byte - -// List operation of types. -const ( - OpNone OpType = 0 - OpSyncUpgradingState OpType = 1 -) - -// String implements fmt.Stringer interface. -func (ot OpType) String() string { - switch ot { - case OpSyncUpgradingState: - return "sync upgrading state" - default: - return "none" - } -} - -// IsSyncedUpgradingState represents whether the upgrading state is synchronized. -func (ot OpType) IsSyncedUpgradingState() bool { - return ot == OpSyncUpgradingState -} - -// DDLOwnerChecker is used to check whether tidb is owner. -type DDLOwnerChecker interface { - // IsOwner returns whether the ownerManager is the owner. - IsOwner() bool -} - -// ownerManager represents the structure which is used for electing owner. -type ownerManager struct { - id string // id is the ID of the manager. - key string - ctx context.Context - prompt string - logPrefix string - logCtx context.Context - etcdCli *clientv3.Client - cancel context.CancelFunc - elec atomic.Pointer[concurrency.Election] - sessionLease *atomicutil.Int64 - wg sync.WaitGroup - campaignCancel context.CancelFunc - - listener Listener -} - -// NewOwnerManager creates a new Manager. -func NewOwnerManager(ctx context.Context, etcdCli *clientv3.Client, prompt, id, key string) Manager { - logPrefix := fmt.Sprintf("[%s] %s ownerManager %s", prompt, key, id) - ctx, cancelFunc := context.WithCancel(ctx) - return &ownerManager{ - etcdCli: etcdCli, - id: id, - key: key, - ctx: ctx, - prompt: prompt, - cancel: cancelFunc, - logPrefix: logPrefix, - logCtx: logutil.WithKeyValue(context.Background(), "owner info", logPrefix), - sessionLease: atomicutil.NewInt64(0), - } -} - -// ID implements Manager.ID interface. -func (m *ownerManager) ID() string { - return m.id -} - -// IsOwner implements Manager.IsOwner interface. -func (m *ownerManager) IsOwner() bool { - return m.elec.Load() != nil -} - -// Cancel implements Manager.Cancel interface. -func (m *ownerManager) Cancel() { - m.cancel() - m.wg.Wait() -} - -// RequireOwner implements Manager.RequireOwner interface. -func (*ownerManager) RequireOwner(_ context.Context) error { - return nil -} - -func (m *ownerManager) SetListener(listener Listener) { - m.listener = listener -} - -// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing. -var ManagerSessionTTL = 60 - -// setManagerSessionTTL sets the ManagerSessionTTL value, it's used for testing. -func setManagerSessionTTL() error { - ttlStr := os.Getenv("tidb_manager_ttl") - if ttlStr == "" { - return nil - } - ttl, err := strconv.Atoi(ttlStr) - if err != nil { - return errors.Trace(err) - } - ManagerSessionTTL = ttl - return nil -} - -// CampaignOwner implements Manager.CampaignOwner interface. -func (m *ownerManager) CampaignOwner(withTTL ...int) error { - ttl := ManagerSessionTTL - if len(withTTL) == 1 { - ttl = withTTL[0] - } - logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) - logutil.BgLogger().Info("start campaign owner", zap.String("ownerInfo", logPrefix)) - session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ttl) - if err != nil { - return errors.Trace(err) - } - m.sessionLease.Store(int64(session.Lease())) - m.wg.Add(1) - var campaignContext context.Context - campaignContext, m.campaignCancel = context.WithCancel(m.ctx) - go m.campaignLoop(campaignContext, session) - return nil -} - -// ResignOwner lets the owner start a new election. -func (m *ownerManager) ResignOwner(ctx context.Context) error { - elec := m.elec.Load() - if elec == nil { - return errors.Errorf("This node is not a owner, can't be resigned") - } - - childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout) - err := elec.Resign(childCtx) - cancel() - if err != nil { - return errors.Trace(err) - } - - logutil.Logger(m.logCtx).Warn("resign owner success") - return nil -} - -func (m *ownerManager) toBeOwner(elec *concurrency.Election) { - m.elec.Store(elec) - logutil.Logger(m.logCtx).Info("become owner") - if m.listener != nil { - m.listener.OnBecomeOwner() - } -} - -// RetireOwner make the manager to be a not owner. -func (m *ownerManager) RetireOwner() { - m.elec.Store(nil) - logutil.Logger(m.logCtx).Info("retire owner") - if m.listener != nil { - m.listener.OnRetireOwner() - } -} - -// CampaignCancel implements Manager.CampaignCancel interface. -func (m *ownerManager) CampaignCancel() { - m.campaignCancel() - m.wg.Wait() -} - -func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession *concurrency.Session) { - defer func() { - m.campaignCancel() - if r := recover(); r != nil { - logutil.BgLogger().Error("recover panic", zap.String("prompt", m.prompt), zap.Any("error", r), zap.Stack("buffer")) - metrics.PanicCounter.WithLabelValues(metrics.LabelDDLOwner).Inc() - } - m.wg.Done() - }() - - logPrefix := m.logPrefix - logCtx := m.logCtx - var err error - for { - if err != nil { - metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, err.Error()).Inc() - } - - select { - case <-etcdSession.Done(): - logutil.Logger(logCtx).Info("etcd session is done, creates a new one") - leaseID := etcdSession.Lease() - etcdSession, err = util2.NewSession(campaignContext, logPrefix, m.etcdCli, util2.NewSessionRetryUnlimited, ManagerSessionTTL) - if err != nil { - logutil.Logger(logCtx).Info("break campaign loop, NewSession failed", zap.Error(err)) - m.revokeSession(logPrefix, leaseID) - return - } - m.sessionLease.Store(int64(etcdSession.Lease())) - case <-campaignContext.Done(): - failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) { - if v.(string) == "delOwnerKeyAndNotOwner" { - logutil.Logger(logCtx).Info("mock break campaign and don't clear related info") - return - } - }) - logutil.Logger(logCtx).Info("break campaign loop, context is done") - m.revokeSession(logPrefix, etcdSession.Lease()) - return - default: - } - // If the etcd server turns clocks forward,the following case may occur. - // The etcd server deletes this session's lease ID, but etcd session doesn't find it. - // In this time if we do the campaign operation, the etcd server will return ErrLeaseNotFound. - if terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) { - if etcdSession != nil { - err = etcdSession.Close() - logutil.Logger(logCtx).Info("etcd session encounters the error of lease not found, closes it", zap.Error(err)) - } - continue - } - - elec := concurrency.NewElection(etcdSession, m.key) - err = elec.Campaign(campaignContext, m.id) - if err != nil { - logutil.Logger(logCtx).Info("failed to campaign", zap.Error(err)) - continue - } - - ownerKey, currRev, err := GetOwnerKeyInfo(campaignContext, logCtx, m.etcdCli, m.key, m.id) - if err != nil { - continue - } - - m.toBeOwner(elec) - err = m.watchOwner(campaignContext, etcdSession, ownerKey, currRev) - logutil.Logger(logCtx).Info("watch owner finished", zap.Error(err)) - m.RetireOwner() - - metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc() - logutil.Logger(logCtx).Warn("is not the owner") - } -} - -func (m *ownerManager) revokeSession(_ string, leaseID clientv3.LeaseID) { - // Revoke the session lease. - // If revoke takes longer than the ttl, lease is expired anyway. - cancelCtx, cancel := context.WithTimeout(context.Background(), - time.Duration(ManagerSessionTTL)*time.Second) - _, err := m.etcdCli.Revoke(cancelCtx, leaseID) - cancel() - logutil.Logger(m.logCtx).Info("revoke session", zap.Error(err)) -} - -// GetOwnerID implements Manager.GetOwnerID interface. -func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) { - _, ownerID, _, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key) - return string(ownerID), errors.Trace(err) -} - -func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, int64, error) { - var op OpType - var resp *clientv3.GetResponse - var err error - for i := 0; i < 3; i++ { - if err = ctx.Err(); err != nil { - return "", nil, op, 0, 0, errors.Trace(err) - } - - childCtx, cancel := context.WithTimeout(ctx, util.KeyOpDefaultTimeout) - resp, err = etcdCli.Get(childCtx, ownerPath, clientv3.WithFirstCreate()...) - cancel() - if err == nil { - break - } - logutil.Logger(logCtx).Info("etcd-cli get owner info failed", zap.String("key", ownerPath), zap.Int("retryCnt", i), zap.Error(err)) - time.Sleep(util.KeyOpRetryInterval) - } - if err != nil { - logutil.Logger(logCtx).Warn("etcd-cli get owner info failed", zap.Error(err)) - return "", nil, op, 0, 0, errors.Trace(err) - } - if len(resp.Kvs) == 0 { - return "", nil, op, 0, 0, concurrency.ErrElectionNoLeader - } - - var ownerID []byte - ownerID, op = splitOwnerValues(resp.Kvs[0].Value) - logutil.Logger(logCtx).Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key), - zap.ByteString("ownerID", ownerID), zap.Stringer("op", op)) - return string(resp.Kvs[0].Key), ownerID, op, resp.Header.Revision, resp.Kvs[0].ModRevision, nil -} - -// GetOwnerKeyInfo gets the owner key and current revision. -func GetOwnerKeyInfo( - ctx, logCtx context.Context, - etcdCli *clientv3.Client, - etcdKey, id string, -) (string, int64, error) { - ownerKey, ownerID, _, currRevision, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey) - if err != nil { - return "", 0, errors.Trace(err) - } - if string(ownerID) != id { - logutil.Logger(logCtx).Warn("is not the owner") - return "", 0, errors.New("ownerInfoNotMatch") - } - - return ownerKey, currRevision, nil -} - -func splitOwnerValues(val []byte) ([]byte, OpType) { - vals := bytes.Split(val, []byte("_")) - var op OpType - if len(vals) == 2 { - op = OpType(vals[1][0]) - } - return vals[0], op -} - -func joinOwnerValues(vals ...[]byte) []byte { - return bytes.Join(vals, []byte("_")) -} - -// SetOwnerOpValue implements Manager.SetOwnerOpValue interface. -func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error { - // owner don't change. - ownerKey, ownerID, currOp, _, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key) - if err != nil { - return errors.Trace(err) - } - if currOp == op { - logutil.Logger(m.logCtx).Info("set owner op is the same as the original, so do nothing.", zap.Stringer("op", op)) - return nil - } - if string(ownerID) != m.id { - return errors.New("ownerInfoNotMatch") - } - newOwnerVal := joinOwnerValues(ownerID, []byte{byte(op)}) - - failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) { - if valStr, ok := v.(string); ok { - if err := mockDelOwnerKey(valStr, ownerKey, m); err != nil { - failpoint.Return(err) - } - } - }) - - leaseOp := clientv3.WithLease(clientv3.LeaseID(m.sessionLease.Load())) - resp, err := m.etcdCli.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(ownerKey), "=", modRevision)). - Then(clientv3.OpPut(ownerKey, string(newOwnerVal), leaseOp)). - Commit() - if err == nil && !resp.Succeeded { - err = errors.New("put owner key failed, cmp is false") - } - logutil.BgLogger().Info("set owner op value", zap.String("owner key", ownerKey), zap.ByteString("ownerID", ownerID), - zap.Stringer("old Op", currOp), zap.Stringer("op", op), zap.Error(err)) - metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.PutValue+"_"+metrics.RetLabel(err)).Inc() - return errors.Trace(err) -} - -// GetOwnerOpValue gets the owner op value. -func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath, logPrefix string) (OpType, error) { - // It's using for testing. - if etcdCli == nil { - return *mockOwnerOpValue.Load(), nil - } - - logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) - _, _, op, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath) - return op, errors.Trace(err) -} - -// WatchOwnerForTest watches the ownerKey. -// This function is used to test watchOwner(). -func WatchOwnerForTest(ctx context.Context, m Manager, etcdSession *concurrency.Session, key string, createRevison int64) error { - if ownerManager, ok := m.(*ownerManager); ok { - return ownerManager.watchOwner(ctx, etcdSession, key, createRevison) - } - return nil -} - -func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string, currRev int64) error { - logPrefix := fmt.Sprintf("[%s] ownerManager %s watch owner key %v", m.prompt, m.id, key) - logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) - logutil.BgLogger().Debug(logPrefix) - // we need to watch the ownerKey since currRev + 1. - watchCh := m.etcdCli.Watch(ctx, key, clientv3.WithRev(currRev+1)) - for { - select { - case resp, ok := <-watchCh: - if !ok { - metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.WatcherClosed).Inc() - logutil.Logger(logCtx).Info("watcher is closed, no owner") - return errors.Errorf("watcher is closed, key: %v", key) - } - if resp.Canceled { - metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Cancelled).Inc() - logutil.Logger(logCtx).Info("watch canceled, no owner") - return errors.Errorf("watch canceled, key: %v", key) - } - - for _, ev := range resp.Events { - if ev.Type == mvccpb.DELETE { - metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Deleted).Inc() - logutil.Logger(logCtx).Info("watch failed, owner is deleted") - return nil - } - } - case <-etcdSession.Done(): - metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.SessionDone).Inc() - return nil - case <-ctx.Done(): - metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.CtxDone).Inc() - return nil - } - } -} - -func init() { - err := setManagerSessionTTL() - if err != nil { - logutil.BgLogger().Warn("set manager session TTL failed", zap.Error(err)) - } -} - -// DeleteLeader deletes the leader key. -func DeleteLeader(ctx context.Context, cli *clientv3.Client, key string) error { - ownerKey, _, _, _, _, err := getOwnerInfo(ctx, ctx, cli, key) - if err != nil { - return errors.Trace(err) - } - _, err = cli.Delete(ctx, ownerKey) - return err -} - -// AcquireDistributedLock creates a mutex with ETCD client, and returns a mutex release function. -func AcquireDistributedLock( - ctx context.Context, - cli *clientv3.Client, - key string, - ttlInSec int, -) (release func(), err error) { - se, err := concurrency.NewSession(cli, concurrency.WithTTL(ttlInSec)) - if err != nil { - return nil, err - } - mu := concurrency.NewMutex(se, key) - maxRetryCnt := 10 - err = util2.RunWithRetry(maxRetryCnt, util2.RetryInterval, func() (bool, error) { - err = mu.Lock(ctx) - if err != nil { - return true, err - } - return false, nil - }) - if err != nil { - err1 := se.Close() - if err1 != nil { - logutil.Logger(ctx).Warn("close session error", zap.Error(err1)) - } - return nil, err - } - logutil.Logger(ctx).Info("acquire distributed flush lock success", zap.String("key", key)) - return func() { - err = mu.Unlock(ctx) - if err != nil { - logutil.Logger(ctx).Warn("release distributed flush lock error", zap.Error(err), zap.String("key", key)) - } else { - logutil.Logger(ctx).Info("release distributed flush lock success", zap.String("key", key)) - } - err = se.Close() - if err != nil { - logutil.Logger(ctx).Warn("close session error", zap.Error(err)) - } - }, nil -} diff --git a/pkg/owner/manager_test.go b/pkg/owner/manager_test.go deleted file mode 100644 index 907ebe79de896..0000000000000 --- a/pkg/owner/manager_test.go +++ /dev/null @@ -1,562 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package owner_test - -import ( - "context" - "fmt" - "net/url" - "runtime" - "sync/atomic" - "testing" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - . "github.com/pingcap/tidb/pkg/ddl" - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/owner" - "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/store/mockstore" - "github.com/pingcap/tidb/pkg/testkit" - "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/concurrency" - "go.etcd.io/etcd/server/v3/embed" - "go.etcd.io/etcd/tests/v3/integration" - "golang.org/x/exp/rand" -) - -const testLease = 5 * time.Millisecond - -type testInfo struct { - store kv.Storage - cluster *integration.ClusterV3 - client *clientv3.Client - ddl DDL -} - -func newTestInfo(t *testing.T) *testInfo { - store, err := mockstore.NewMockStore() - require.NoError(t, err) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 4}) - - cli := cluster.Client(0) - ic := infoschema.NewCache(nil, 2) - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) - d, _ := NewDDL( - context.Background(), - WithEtcdClient(cli), - WithStore(store), - WithLease(testLease), - WithInfoCache(ic), - ) - - return &testInfo{ - store: store, - cluster: cluster, - client: cli, - ddl: d, - } -} - -func (ti *testInfo) Close(t *testing.T) { - err := ti.ddl.Stop() - require.NoError(t, err) - err = ti.store.Close() - require.NoError(t, err) - ti.cluster.Terminate(t) -} - -type listener struct { - val atomic.Bool -} - -func (l *listener) OnBecomeOwner() { - l.val.Store(true) -} -func (l *listener) OnRetireOwner() { - l.val.Store(false) -} - -func TestSingle(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") - } - integration.BeforeTestExternal(t) - - tInfo := newTestInfo(t) - client, d := tInfo.client, tInfo.ddl - defer tInfo.Close(t) - ownerManager := d.OwnerManager() - lis := &listener{} - ownerManager.SetListener(lis) - require.NoError(t, ownerManager.CampaignOwner()) - isOwner := checkOwner(d, true) - require.True(t, isOwner) - require.True(t, lis.val.Load()) - - // test for newSession failed - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - manager := owner.NewOwnerManager(ctx, client, "ddl", "ddl_id", DDLOwnerKey) - cancel() - - err := manager.CampaignOwner() - comment := fmt.Sprintf("campaigned result don't match, err %v", err) - require.True(t, terror.ErrorEqual(err, context.Canceled) || terror.ErrorEqual(err, context.DeadlineExceeded), comment) - - isOwner = checkOwner(d, true) - require.True(t, isOwner) - - // The test is used to exit campaign loop. - ownerManager.Cancel() - isOwner = checkOwner(d, false) - require.False(t, isOwner) - require.False(t, lis.val.Load()) - - time.Sleep(200 * time.Millisecond) - - // err is ok to be not nil since we canceled the manager. - ownerID, _ := manager.GetOwnerID(ctx) - require.Equal(t, "", ownerID) - op, _ := owner.GetOwnerOpValue(ctx, client, DDLOwnerKey, "log prefix") - require.Equal(t, op, owner.OpNone) -} - -func TestSetAndGetOwnerOpValue(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") - } - integration.BeforeTestExternal(t) - - tInfo := newTestInfo(t) - defer tInfo.Close(t) - - require.NoError(t, tInfo.ddl.OwnerManager().CampaignOwner()) - isOwner := checkOwner(tInfo.ddl, true) - require.True(t, isOwner) - - // test set/get owner info - manager := tInfo.ddl.OwnerManager() - ownerID, err := manager.GetOwnerID(context.Background()) - require.NoError(t, err) - require.Equal(t, tInfo.ddl.GetID(), ownerID) - op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") - require.NoError(t, err) - require.Equal(t, op, owner.OpNone) - require.False(t, op.IsSyncedUpgradingState()) - err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) - require.NoError(t, err) - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") - require.NoError(t, err) - require.Equal(t, op, owner.OpSyncUpgradingState) - require.True(t, op.IsSyncedUpgradingState()) - // update the same as the original value - err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) - require.NoError(t, err) - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") - require.NoError(t, err) - require.Equal(t, op, owner.OpSyncUpgradingState) - require.True(t, op.IsSyncedUpgradingState()) - // test del owner key when SetOwnerOpValue - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey", `return("delOwnerKeyAndNotOwner")`)) - err = manager.SetOwnerOpValue(context.Background(), owner.OpNone) - require.Error(t, err, "put owner key failed, cmp is false") - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") - require.NotNil(t, err) - require.Equal(t, concurrency.ErrElectionNoLeader.Error(), err.Error()) - require.Equal(t, op, owner.OpNone) - require.False(t, op.IsSyncedUpgradingState()) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey")) - - // Let ddl run for the owner again. - require.NoError(t, tInfo.ddl.OwnerManager().CampaignOwner()) - isOwner = checkOwner(tInfo.ddl, true) - require.True(t, isOwner) - // Mock the manager become not owner because the owner is deleted(like TTL is timeout). - // And then the manager campaigns the owner again, and become the owner. - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey", `return("onlyDelOwnerKey")`)) - err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) - require.Error(t, err, "put owner key failed, cmp is false") - isOwner = checkOwner(tInfo.ddl, true) - require.True(t, isOwner) - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") - require.NoError(t, err) - require.Equal(t, op, owner.OpNone) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey")) -} - -// TestGetOwnerOpValueBeforeSet tests get owner opValue before set this value when the etcdClient is nil. -func TestGetOwnerOpValueBeforeSet(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") - } - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockNotSetOwnerOp", `return(true)`)) - - _, dom := testkit.CreateMockStoreAndDomain(t) - ddl := dom.DDL() - require.NoError(t, ddl.OwnerManager().CampaignOwner()) - isOwner := checkOwner(ddl, true) - require.True(t, isOwner) - - // test set/get owner info - manager := ddl.OwnerManager() - ownerID, err := manager.GetOwnerID(context.Background()) - require.NoError(t, err) - require.Equal(t, ddl.GetID(), ownerID) - op, err := owner.GetOwnerOpValue(context.Background(), nil, DDLOwnerKey, "log prefix") - require.NoError(t, err) - require.Equal(t, op, owner.OpNone) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockNotSetOwnerOp")) - err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) - require.NoError(t, err) - op, err = owner.GetOwnerOpValue(context.Background(), nil, DDLOwnerKey, "log prefix") - require.NoError(t, err) - require.Equal(t, op, owner.OpSyncUpgradingState) -} - -func TestCluster(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") - } - integration.BeforeTestExternal(t) - - originalTTL := owner.ManagerSessionTTL - owner.ManagerSessionTTL = 3 - defer func() { - owner.ManagerSessionTTL = originalTTL - }() - - tInfo := newTestInfo(t) - store, cluster, d := tInfo.store, tInfo.cluster, tInfo.ddl - defer tInfo.Close(t) - require.NoError(t, d.OwnerManager().CampaignOwner()) - - isOwner := checkOwner(d, true) - require.True(t, isOwner) - - cli1 := cluster.Client(1) - ic2 := infoschema.NewCache(nil, 2) - ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) - d1, _ := NewDDL( - context.Background(), - WithEtcdClient(cli1), - WithStore(store), - WithLease(testLease), - WithInfoCache(ic2), - ) - require.NoError(t, d1.OwnerManager().CampaignOwner()) - - isOwner = checkOwner(d1, false) - require.False(t, isOwner) - - // Delete the leader key, the d1 become the owner. - cliRW := cluster.Client(2) - err := deleteLeader(cliRW, DDLOwnerKey) - require.NoError(t, err) - - isOwner = checkOwner(d, false) - require.False(t, isOwner) - - d.OwnerManager().Cancel() - // d3 (not owner) stop - cli3 := cluster.Client(3) - ic3 := infoschema.NewCache(nil, 2) - ic3.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) - d3, _ := NewDDL( - context.Background(), - WithEtcdClient(cli3), - WithStore(store), - WithLease(testLease), - WithInfoCache(ic3), - ) - require.NoError(t, d3.OwnerManager().CampaignOwner()) - - isOwner = checkOwner(d3, false) - require.False(t, isOwner) - - d3.OwnerManager().Cancel() - // Cancel the owner context, there is no owner. - d1.OwnerManager().Cancel() - - logPrefix := fmt.Sprintf("[ddl] %s ownerManager %s", DDLOwnerKey, "useless id") - logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) - _, _, err = owner.GetOwnerKeyInfo(context.Background(), logCtx, cliRW, DDLOwnerKey, "useless id") - require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err) - op, err := owner.GetOwnerOpValue(context.Background(), cliRW, DDLOwnerKey, logPrefix) - require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err) - require.Equal(t, op, owner.OpNone) -} - -func TestWatchOwner(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") - } - integration.BeforeTestExternal(t) - - tInfo := newTestInfo(t) - client, d := tInfo.client, tInfo.ddl - defer tInfo.Close(t) - ownerManager := d.OwnerManager() - lis := &listener{} - ownerManager.SetListener(lis) - require.NoError(t, ownerManager.CampaignOwner()) - isOwner := checkOwner(d, true) - require.True(t, isOwner) - - // get the owner id. - ctx := context.Background() - id, err := ownerManager.GetOwnerID(ctx) - require.NoError(t, err) - - // create etcd session. - session, err := concurrency.NewSession(client) - require.NoError(t, err) - - // test the GetOwnerKeyInfo() - ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, DDLOwnerKey, id) - require.NoError(t, err) - - // watch the ownerKey. - ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*300) - defer cancel2() - watchDone := make(chan bool) - watched := false - go func() { - watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision) - require.NoError(t, watchErr) - watchDone <- true - }() - - select { - case watched = <-watchDone: - case <-ctx2.Done(): - } - require.False(t, watched) - - // delete the owner, and can watch the DELETE event. - err = deleteLeader(client, DDLOwnerKey) - require.NoError(t, err) - watched = <-watchDone - require.True(t, watched) - - // the ownerKey has been deleted, watch ownerKey again, it can be watched. - go func() { - watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision) - require.NoError(t, watchErr) - watchDone <- true - }() - - watched = <-watchDone - require.True(t, watched) -} - -func TestWatchOwnerAfterDeleteOwnerKey(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") - } - integration.BeforeTestExternal(t) - - tInfo := newTestInfo(t) - client, d := tInfo.client, tInfo.ddl - defer tInfo.Close(t) - ownerManager := d.OwnerManager() - lis := &listener{} - ownerManager.SetListener(lis) - require.NoError(t, ownerManager.CampaignOwner()) - isOwner := checkOwner(d, true) - require.True(t, isOwner) - - // get the owner id. - ctx := context.Background() - id, err := ownerManager.GetOwnerID(ctx) - require.NoError(t, err) - session, err := concurrency.NewSession(client) - require.NoError(t, err) - - // get the ownkey informations. - ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, DDLOwnerKey, id) - require.NoError(t, err) - - // delete the ownerkey - err = deleteLeader(client, DDLOwnerKey) - require.NoError(t, err) - - // watch the ownerKey with the current revisoin. - watchDone := make(chan bool) - go func() { - watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision) - require.NoError(t, watchErr) - watchDone <- true - }() - <-watchDone -} - -func checkOwner(d DDL, fbVal bool) (isOwner bool) { - manager := d.OwnerManager() - // The longest to wait for 30 seconds to - // make sure that campaigning owners is completed. - for i := 0; i < 6000; i++ { - time.Sleep(5 * time.Millisecond) - isOwner = manager.IsOwner() - if isOwner == fbVal { - break - } - } - return -} - -func deleteLeader(cli *clientv3.Client, prefixKey string) error { - session, err := concurrency.NewSession(cli) - if err != nil { - return errors.Trace(err) - } - defer func() { - _ = session.Close() - }() - election := concurrency.NewElection(session, prefixKey) - resp, err := election.Leader(context.Background()) - if err != nil { - return errors.Trace(err) - } - _, err = cli.Delete(context.Background(), string(resp.Kvs[0].Key)) - return errors.Trace(err) -} - -func TestImmediatelyCancel(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") - } - integration.BeforeTestExternal(t) - - tInfo := newTestInfo(t) - d := tInfo.ddl - defer tInfo.Close(t) - ownerManager := d.OwnerManager() - for i := 0; i < 10; i++ { - err := ownerManager.CampaignOwner() - require.NoError(t, err) - ownerManager.CampaignCancel() - } -} - -func TestAcquireDistributedLock(t *testing.T) { - const addrFmt = "http://127.0.0.1:%d" - cfg := embed.NewConfig() - cfg.Dir = t.TempDir() - // rand port in [20000, 60000) - randPort := int(rand.Int31n(40000)) + 20000 - clientAddr := fmt.Sprintf(addrFmt, randPort) - lcurl, _ := url.Parse(clientAddr) - cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{*lcurl}, []url.URL{*lcurl} - lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1)) - cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{*lpurl}, []url.URL{*lpurl} - cfg.InitialCluster = "default=" + lpurl.String() - cfg.Logger = "zap" - embedEtcd, err := embed.StartEtcd(cfg) - require.NoError(t, err) - <-embedEtcd.Server.ReadyNotify() - t.Cleanup(func() { - embedEtcd.Close() - }) - makeEtcdCli := func(t *testing.T) (cli *clientv3.Client) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{lcurl.String()}, - }) - require.NoError(t, err) - t.Cleanup(func() { - cli.Close() - }) - return cli - } - t.Run("acquire distributed lock with same client", func(t *testing.T) { - cli := makeEtcdCli(t) - getLock := make(chan struct{}) - ctx := context.Background() - - release1, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10) - require.NoError(t, err) - var wg util.WaitGroupWrapper - wg.Run(func() { - // Acquire another distributed lock will be blocked. - release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10) - require.NoError(t, err) - getLock <- struct{}{} - release2() - }) - timer := time.NewTimer(300 * time.Millisecond) - select { - case <-getLock: - require.FailNow(t, "acquired same lock unexpectedly") - case <-timer.C: - release1() - <-getLock - } - wg.Wait() - - release1, err = owner.AcquireDistributedLock(ctx, cli, "test-lock/1", 10) - require.NoError(t, err) - release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock/2", 10) - require.NoError(t, err) - release1() - release2() - }) - - t.Run("acquire distributed lock with different clients", func(t *testing.T) { - cli1 := makeEtcdCli(t) - cli2 := makeEtcdCli(t) - - getLock := make(chan struct{}) - ctx := context.Background() - - release1, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 10) - require.NoError(t, err) - var wg util.WaitGroupWrapper - wg.Run(func() { - // Acquire another distributed lock will be blocked. - release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10) - require.NoError(t, err) - getLock <- struct{}{} - release2() - }) - timer := time.NewTimer(300 * time.Millisecond) - select { - case <-getLock: - require.FailNow(t, "acquired same lock unexpectedly") - case <-timer.C: - release1() - <-getLock - } - wg.Wait() - }) - - t.Run("acquire distributed lock until timeout", func(t *testing.T) { - cli1 := makeEtcdCli(t) - cli2 := makeEtcdCli(t) - ctx := context.Background() - - _, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 1) - require.NoError(t, err) - cli1.Close() // Note that release() is not invoked. - - release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10) - require.NoError(t, err) - release2() - }) -}