Skip to content

Commit

Permalink
Update conssitent_index when applying fails
Browse files Browse the repository at this point in the history
When clients have no permission to perform whatever operation, then
the applying may fail. We should also move consistent_index forward
in this case, otherwise the consitent_index may smaller than the
snapshot index.
  • Loading branch information
ahrtr committed Apr 14, 2022
1 parent 0452fee commit c870bbb
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
6 changes: 4 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2166,11 +2166,14 @@ func (s *EtcdServer) apply(
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
var ar *applyResult
applyV3Performed := false
defer func() {
// The txPostLock callback will not get called in this case,
// so we should set the consistent index directly.
if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 {
if s.consistIndex != nil &&
membership.ApplyBoth == shouldApplyV3 &&
(!applyV3Performed || (ar != nil && ar.err != nil)) {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
}
}()
Expand Down Expand Up @@ -2220,7 +2223,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
id = raftReq.Header.ID
}

var ar *applyResult
needResult := s.w.IsRegistered(id)
if needResult || !noSideEffect(&raftReq) {
if !needResult && raftReq.Txn != nil {
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/v3_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,33 @@ func TestV3AuthEmptyUserGet(t *testing.T) {
}
}

// TestV3AuthEmptyUserPut ensures that a put with an empty user will return an empty user error,
// and the consistent_index should be moved forward even the apply-->Put fails.
func TestV3AuthEmptyUserPut(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{
Size: 1,
SnapshotCount: 3,
})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()

api := toGRPC(clus.Client(0))
authSetupRoot(t, api.Auth)

// The SnapshotCount is 3, so there must be at least 3 new snapshot files being created.
// The VERIFY logic will check whether the consistent_index >= last snapshot index on
// cluster terminating.
for i := 0; i < 10; i++ {
_, err := api.KV.Put(ctx, &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if !eqErrGRPC(err, rpctypes.ErrUserEmpty) {
t.Fatalf("got %v, expected %v", err, rpctypes.ErrUserEmpty)
}
}
}

// TestV3AuthTokenWithDisable tests that auth won't crash if
// given a valid token when authentication is disabled
func TestV3AuthTokenWithDisable(t *testing.T) {
Expand Down

0 comments on commit c870bbb

Please sign in to comment.