Skip to content

Commit

Permalink
Cherry-picks for 2.10.21-RC.3 (#5921)
Browse files Browse the repository at this point in the history
Incudes the following:

- #5910
- #5914
- #5917

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored Sep 24, 2024
2 parents 0a32987 + e958559 commit 1e29e3a
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 16 deletions.
6 changes: 6 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2764,6 +2764,12 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun
return
}

// This should only be coming from the System Account.
if acc != s.SystemAccount() {
s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
return
}

js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
return
Expand Down
12 changes: 8 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,10 +583,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
return false
}
s := js.srv
js.mu.RUnlock()

// Capture RAFT node from assignment.
node := ca.Group.node
js.mu.RUnlock()

// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
Expand Down Expand Up @@ -1039,7 +1038,7 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool {
ourID := cc.meta.ID()
for _, peer := range rg.Peers {
if peer == ourID {
if len(rg.Peers) == 1 || rg.node != nil && rg.node.Leader() {
if len(rg.Peers) == 1 || (rg.node != nil && rg.node.Leader()) {
return true
}
}
Expand Down Expand Up @@ -1201,7 +1200,12 @@ func (js *jetStream) checkForOrphans() {
stream = mset.cfg.Name
mset.mu.RUnlock()
}
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
if o.isDurable() {
s.Warnf("Detected orphaned durable consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
} else {
s.Debugf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
}

if err := o.delete(); err != nil {
s.Warnf("Deleting consumer encountered an error: %v", err)
}
Expand Down
37 changes: 37 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6403,6 +6403,43 @@ func TestJetStreamClusterConsumerDeleteInterestPolicyPerf(t *testing.T) {
require_Equal(t, si.State.Msgs, 0)
}

// Make sure to not allow non-system accounts to move meta leader.
func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomServer()

// Client based API
nc, _ := jsClientConnect(t, s)
defer nc.Close()

ml := c.leader()

_, err := nc.Request(JSApiLeaderStepDown, nil, time.Second)
require_Error(t, err, nats.ErrTimeout)

// Make sure we did not move.
c.waitOnLeader()
require_Equal(t, ml, c.leader())

// System user can move it.
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

resp, err := snc.Request(JSApiLeaderStepDown, nil, time.Second)
require_NoError(t, err)

var sdr JSApiLeaderStepDownResponse
require_NoError(t, json.Unmarshal(resp.Data, &sdr))
require_True(t, sdr.Success)
require_Equal(t, sdr.Error, nil)

// Make sure we did move this time.
c.waitOnLeader()
require_NotEqual(t, ml, c.leader())
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
11 changes: 9 additions & 2 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4900,8 +4900,9 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
}
`

_, syspub := createKey(t)
sysKp, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
sysCreds := newUser(t, sysKp)

accKp, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
Expand Down Expand Up @@ -4994,6 +4995,7 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
// Move our R3 stream leader and make sure acounting is correct.
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(aExpPub, "TEST1")

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

Expand Down Expand Up @@ -5025,6 +5027,7 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(aExpPub, "TEST1")

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

Expand All @@ -5042,11 +5045,15 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

// Need system user here to move the leader.
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserCredentials(sysCreds))
defer snc.Close()

requestLeaderStepDown := func() {
ml := c.leader()
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
if cml := c.leader(); cml == ml {
nc.Request(JSApiLeaderStepDown, nil, time.Second)
snc.Request(JSApiLeaderStepDown, nil, time.Second)
return fmt.Errorf("Metaleader has not moved yet")
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3361,13 +3361,13 @@ func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) {
defer sCluster.shutdown()

requestLeaderStepDown := func(clientURL string) error {
nc, err := nats.Connect(clientURL)
nc, err := nats.Connect(clientURL, nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
return err
}
defer nc.Close()

ncResp, err := nc.Request(JSApiLeaderStepDown, nil, 3*time.Second)
ncResp, err := nc.Request(JSApiLeaderStepDown, nil, time.Second)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6542,6 +6542,7 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) {
listen: 127.0.0.1:%d
routes = [%s]
}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
storeCnf := func(serverName, clusterName, storeDir, conf string) string {
switch serverName {
Expand All @@ -6566,7 +6567,7 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) {
defer cluster.shutdown()

requestLeaderStepDown := func(clientURL string) error {
nc, err := nats.Connect(clientURL)
nc, err := nats.Connect(clientURL, nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
return err
}
Expand Down
11 changes: 9 additions & 2 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1882,7 +1882,10 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
doneCh := make(chan bool)

if sl == sc.leader() {
nc.Request(JSApiLeaderStepDown, nil, time.Second)
snc, _ := jsClientConnect(t, sc.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()
_, err := snc.Request(JSApiLeaderStepDown, nil, time.Second)
require_NoError(t, err)
sc.waitOnLeader()
}

Expand Down Expand Up @@ -10315,7 +10318,11 @@ func TestNoRaceWQAndMultiSubjectFiltersRace(t *testing.T) {
return nil
}
// Move meta-leader since stream can be R1.
nc.Request(JSApiLeaderStepDown, nil, time.Second)
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()
if _, err := snc.Request(JSApiLeaderStepDown, nil, time.Second); err != nil {
return err
}
return fmt.Errorf("stream leader on meta-leader")
})

Expand Down
19 changes: 16 additions & 3 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,15 @@ func TestNRGStepDownOnSameTermDoesntClearVote(t *testing.T) {
}

func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) {
// This test relies on nodes not hitting their election timer too often,
// otherwise the step later where we capture the election time before and
// after the failed vote request will flake.
origMinTimeout, origMaxTimeout, origHBInterval := minElectionTimeout, maxElectionTimeout, hbInterval
minElectionTimeout, maxElectionTimeout, hbInterval = time.Second*5, time.Second*10, time.Second*10
defer func() {
minElectionTimeout, maxElectionTimeout, hbInterval = origMinTimeout, origMaxTimeout, origHBInterval
}()

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
c.waitOnLeader()
Expand All @@ -379,6 +388,8 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) {
// time so that it guarantees that both the leader and the follower aren't
// operating at the time we take the etlr snapshots.
rg.lockAll()
leader.resetElect(maxElectionTimeout)
follower.resetElect(maxElectionTimeout)
leaderOriginal := leader.etlr
followerOriginal := follower.etlr
vr := &voteRequest{
Expand All @@ -404,9 +415,11 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) {
// Neither the leader nor our chosen follower should have updated their
// election timer as a result of this.
rg.lockAll()
defer rg.unlockAll()
require_True(t, leaderOriginal.Equal(leader.etlr))
require_True(t, followerOriginal.Equal(follower.etlr))
leaderEqual := leaderOriginal.Equal(leader.etlr)
followerEqual := followerOriginal.Equal(follower.etlr)
rg.unlockAll()
require_True(t, leaderEqual)
require_True(t, followerEqual)
}

func TestNRGInvalidTAVDoesntPanic(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
mset.store.FastState(&state)

// Possible race with consumer.setLeader during recovery.
mset.mu.RLock()
mset.mu.Lock()
mset.lseq = state.LastSeq
mset.mu.RUnlock()
mset.mu.Unlock()

// If no msgs (new stream), set dedupe state loaded to true.
if state.Msgs == 0 {
Expand Down

0 comments on commit 1e29e3a

Please sign in to comment.