Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] Support linearizable renew lease for 3.4 #14177

Merged
merged 1 commit into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var (
ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err()
ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
ErrGRPCTimeoutWaitAppliedIndex = status.New(codes.Unavailable, "etcdserver: request timed out, waiting for the applied index took too long").Err()
ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
Expand Down Expand Up @@ -132,6 +133,7 @@ var (
ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
ErrorDesc(ErrGRPCTimeoutDueToLeaderFail): ErrGRPCTimeoutDueToLeaderFail,
ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost,
ErrorDesc(ErrGRPCTimeoutWaitAppliedIndex): ErrGRPCTimeoutWaitAppliedIndex,
ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy,
ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt,
ErrorDesc(ErrGPRCNotSupportedForLearner): ErrGPRCNotSupportedForLearner,
Expand Down Expand Up @@ -192,6 +194,7 @@ var (
ErrTimeout = Error(ErrGRPCTimeout)
ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
ErrTimeoutWaitAppliedIndex = Error(ErrGRPCTimeoutWaitAppliedIndex)
ErrUnhealthy = Error(ErrGRPCUnhealthy)
ErrCorrupt = Error(ErrGRPCCorrupt)
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)
Expand Down
1 change: 1 addition & 0 deletions etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex,
etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
Expand Down
1 change: 1 addition & 0 deletions etcdserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutWaitAppliedIndex = errors.New("etcdserver: request timed out, waiting for the applied index took too long")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
Expand Down
56 changes: 56 additions & 0 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1850,3 +1850,59 @@ func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) {
}
s.sendC <- send
}

func TestWaitAppliedIndex(t *testing.T) {
cases := []struct {
name string
appliedIndex uint64
committedIndex uint64
action func(s *EtcdServer)
ExpectedError error
}{
{
name: "The applied Id is already equal to the commitId",
appliedIndex: 10,
committedIndex: 10,
action: func(s *EtcdServer) {
s.applyWait.Trigger(10)
},
ExpectedError: nil,
},
{
name: "The etcd server has already stopped",
appliedIndex: 10,
committedIndex: 12,
action: func(s *EtcdServer) {
s.stopping <- struct{}{}
},
ExpectedError: ErrStopped,
},
{
name: "Timed out waiting for the applied index",
appliedIndex: 10,
committedIndex: 12,
action: nil,
ExpectedError: ErrTimeoutWaitAppliedIndex,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
s := &EtcdServer{
appliedIndex: tc.appliedIndex,
committedIndex: tc.committedIndex,
stopping: make(chan struct{}, 1),
applyWait: wait.NewTimeList(),
}

if tc.action != nil {
go tc.action(s)
}

err := s.waitAppliedIndex()

if err != tc.ExpectedError {
t.Errorf("Unexpected error, want (%v), got (%v)", tc.ExpectedError, err)
}
})
}
}
43 changes: 34 additions & 9 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (
// We should stop accepting new proposals if the gap growing to a certain point.
maxGapBetweenApplyAndCommitIndex = 5000
traceThreshold = 100 * time.Millisecond

// The timeout for the node to catch up its applied index, and is used in
// lease related operations, such as LeaseRenew and LeaseTimeToLive.
applyTimeout = time.Second
)

type RaftKV interface {
Expand Down Expand Up @@ -257,6 +261,18 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*
return resp.(*pb.LeaseGrantResponse), nil
}

func (s *EtcdServer) waitAppliedIndex() error {
select {
case <-s.ApplyWait():
case <-s.stopping:
return ErrStopped
case <-time.After(applyTimeout):
return ErrTimeoutWaitAppliedIndex
}

return nil
}

func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
if err != nil {
Expand All @@ -266,26 +282,32 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}

ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
}

cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil && err != nil {
for cctx.Err() == nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
Expand All @@ -299,7 +321,10 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
}

func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
if s.Leader() == s.ID() {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return nil, err
}
// primary; timetolive directly from leader
le := s.lessor.Lookup(lease.LeaseID(r.ID))
if le == nil {
Expand Down
4 changes: 2 additions & 2 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,13 @@ func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient
defer cancel()
errc := make(chan error)

for i := 0; i < 30; i++ {
for i := 0; i < 100; i++ {
for j := 0; j < 3; j++ {
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
}
}

for i := 0; i < 90; i++ {
for i := 0; i < 300; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
Expand Down