Skip to content

Commit

Permalink
Add e2e test cases to reproduce the lease revoke issue
Browse files Browse the repository at this point in the history
Refer to etcd-io#15247

Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
  • Loading branch information
ahrtr committed Dec 15, 2023
1 parent 2f03bbc commit f7e488d
Showing 1 changed file with 160 additions and 0 deletions.
160 changes: 160 additions & 0 deletions tests/e2e/v3_lease_no_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)

// TestLeaseRevokeByOldLeader verifies that leases shouldn't be revoked by
// old leader.
// See the first case in the section "Root Cause" in
// https://docs.google.com/document/d/1peLDjwebnSuNR69ombuUiJ5fkSw7Rd3Slv_mUpOLsmg/edit
func TestLeaseRevokeByOldLeader(t *testing.T) {
testLeaseRevokeIssue(t, func(cluster *e2e.EtcdProcessCluster, leaderIndex int) []string {
// return the endpoint of a follower
return cluster.Procs[(leaderIndex+1)%len(cluster.Procs)].EndpointsGRPC()
})
}

// TestLeaseRevokeByOldLeader verifies that leases shouldn't be revoked by
// new leader.
// See the second case in the section "Root Cause" in
// https://docs.google.com/document/d/1peLDjwebnSuNR69ombuUiJ5fkSw7Rd3Slv_mUpOLsmg/edit
func TestLeaseRevokeByNewLeader(t *testing.T) {
testLeaseRevokeIssue(t, func(cluster *e2e.EtcdProcessCluster, leaderIndex int) []string {
// return all members' endpoints, so that the client can switch to
// other member in case the leader gets stuck on writing disk.
return cluster.EndpointsGRPC()
})
}

func testLeaseRevokeIssue(t *testing.T, epsFn func(cluster *e2e.EtcdProcessCluster, leaderIndex int) []string) {
e2e.BeforeTest(t)

ctx := context.Background()

t.Log("Starting a new etcd cluster")
epc, err := e2e.NewEtcdProcessCluster(ctx, t,
e2e.WithClusterSize(3),
e2e.WithGoFailEnabled(true),
e2e.WithGoFailClientTimeout(40*time.Second),
)
require.NoError(t, err)
defer func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
}()

leaderIdx := epc.WaitLeader(t)
t.Logf("Leader index: %d", leaderIdx)

epsForNormalOperations := epc.Procs[(leaderIdx+2)%3].EndpointsGRPC()
t.Logf("Creating a client for normal operations: %v", epsForNormalOperations)
client, err := clientv3.New(clientv3.Config{Endpoints: epsForNormalOperations, DialTimeout: 3 * time.Second})
require.NoError(t, err)
defer client.Close()

epsForLeaseKeepAlive := epsFn(epc, leaderIdx)
t.Logf("Creating a client for the leaseKeepAlive operation: %v", epsForLeaseKeepAlive)
clientForKeepAlive, err := clientv3.New(clientv3.Config{Endpoints: epsForLeaseKeepAlive, DialTimeout: 3 * time.Second})
require.NoError(t, err)
defer clientForKeepAlive.Close()

resp, err := client.Status(ctx, epsForNormalOperations[0])
require.NoError(t, err)
oldLeaderId := resp.Leader

t.Log("Creating a new lease")
leaseRsp, err := client.Grant(ctx, 20)
require.NoError(t, err)

t.Log("Starting a goroutine to keep alive the lease")
doneC := make(chan struct{})
stopC := make(chan struct{})
startC := make(chan struct{}, 1)
go func() {
defer close(doneC)

respC, kerr := clientForKeepAlive.KeepAlive(ctx, leaseRsp.ID)
require.NoError(t, kerr)
// ensure we have received the first response from the server
<-respC
startC <- struct{}{}

for {
select {
case <-stopC:
return
case <-respC:
}
}
}()

t.Log("Wait for the keepAlive goroutine to get started")
<-startC

t.Log("Trigger the failpoint to simulate stalled writing")
err = epc.Procs[leaderIdx].Failpoints().SetupHTTP(ctx, "raftBeforeSave", `sleep("30s")`)
require.NoError(t, err)

cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Logf("Waiting for a new leader to be elected, old leader index: %d, old leader ID: %d", leaderIdx, oldLeaderId)
testutils.ExecuteUntil(cctx, t, func() {
for {
resp, err = client.Status(ctx, epsForNormalOperations[0])
if err == nil && resp.Leader != oldLeaderId {
t.Logf("A new leader has already been elected, new leader index: %d", resp.Leader)
return
}
time.Sleep(100 * time.Millisecond)
}
})
cancel()

t.Log("Writing a key/value pair")
_, err = client.Put(ctx, "foo", "bar")
require.NoError(t, err)

t.Log("Sleeping 30 seconds")
time.Sleep(30 * time.Second)

t.Log("Remove the failpoint 'raftBeforeSave'")
err = epc.Procs[leaderIdx].Failpoints().DeactivateHTTP(ctx, "raftBeforeSave")
require.NoError(t, err)

// By default, etcd tries to revoke leases every 7 seconds.
t.Log("Sleeping 10 seconds")
time.Sleep(10 * time.Second)

t.Log("Confirming the lease isn't revoked")
leases, err := client.Leases(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(leases.Leases))

t.Log("Waiting for the keepAlive goroutine to exit")
close(stopC)
<-doneC
}

0 comments on commit f7e488d

Please sign in to comment.