Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Feb 10, 2025
1 parent a584801 commit 8752a71
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 1,123 deletions.
4 changes: 0 additions & 4 deletions owner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ go_test(
],
embed = [":owner"],
flaky = True,
<<<<<<< HEAD:owner/BUILD.bazel
=======
shard_count = 9,
>>>>>>> afdd5c2ecd5 (owner: fix data race on ownerManager.campaignCancel (#56362)):pkg/owner/BUILD.bazel
deps = [
"//ddl",
"//infoschema",
Expand Down
8 changes: 4 additions & 4 deletions owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ func (m *ownerManager) CampaignOwner() error {
return errors.Trace(err)
}
m.wg.Add(1)
go m.campaignLoop(session)
var campaignContext context.Context
campaignContext, m.campaignCancel = context.WithCancel(m.ctx)
go m.campaignLoop(campaignContext, session)
return nil
}

Expand Down Expand Up @@ -194,9 +196,7 @@ func (m *ownerManager) CampaignCancel() {
m.wg.Wait()
}

func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
var campaignContext context.Context
campaignContext, m.campaignCancel = context.WithCancel(m.ctx)
func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession *concurrency.Session) {
defer func() {
m.campaignCancel()
if r := recover(); r != nil {
Expand Down
34 changes: 34 additions & 0 deletions owner/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ import (

const testLease = 5 * time.Millisecond

type testInfo struct {
cluster *integration.ClusterV3
client *clientv3.Client
}

func newTestInfo(t *testing.T) *testInfo {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
return &testInfo{
cluster: cluster,
client: cluster.Client(0),
}
}

func (ti *testInfo) Close(t *testing.T) {
ti.cluster.Terminate(t)
}

func TestSingle(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
Expand Down Expand Up @@ -213,3 +230,20 @@ func deleteLeader(cli *clientv3.Client, prefixKey string) error {
_, err = cli.Delete(context.Background(), string(resp.Kvs[0].Key))
return errors.Trace(err)
}

func TestImmediatelyCancel(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)

tInfo := newTestInfo(t)
defer tInfo.Close(t)
ownerMgr := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "1", "/owner/key")
defer ownerMgr.Cancel()
for i := 0; i < 10; i++ {
err := ownerMgr.CampaignOwner()
require.NoError(t, err)
ownerMgr.CampaignCancel()
}
}
Loading

0 comments on commit 8752a71

Please sign in to comment.