Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.21-RC.3 #5921

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2764,6 +2764,12 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun
return
}

// This should only be coming from the System Account.
if acc != s.SystemAccount() {
s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
return
}

js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
return
Expand Down
12 changes: 8 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,10 +583,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
return false
}
s := js.srv
js.mu.RUnlock()

// Capture RAFT node from assignment.
node := ca.Group.node
js.mu.RUnlock()

// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
Expand Down Expand Up @@ -1039,7 +1038,7 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool {
ourID := cc.meta.ID()
for _, peer := range rg.Peers {
if peer == ourID {
if len(rg.Peers) == 1 || rg.node != nil && rg.node.Leader() {
if len(rg.Peers) == 1 || (rg.node != nil && rg.node.Leader()) {
return true
}
}
Expand Down Expand Up @@ -1201,7 +1200,12 @@ func (js *jetStream) checkForOrphans() {
stream = mset.cfg.Name
mset.mu.RUnlock()
}
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
if o.isDurable() {
s.Warnf("Detected orphaned durable consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
} else {
s.Debugf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
}

if err := o.delete(); err != nil {
s.Warnf("Deleting consumer encountered an error: %v", err)
}
Expand Down
37 changes: 37 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6403,6 +6403,43 @@ func TestJetStreamClusterConsumerDeleteInterestPolicyPerf(t *testing.T) {
require_Equal(t, si.State.Msgs, 0)
}

// Make sure to not allow non-system accounts to move meta leader.
func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomServer()

// Client based API
nc, _ := jsClientConnect(t, s)
defer nc.Close()

ml := c.leader()

_, err := nc.Request(JSApiLeaderStepDown, nil, time.Second)
require_Error(t, err, nats.ErrTimeout)

// Make sure we did not move.
c.waitOnLeader()
require_Equal(t, ml, c.leader())

// System user can move it.
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

resp, err := snc.Request(JSApiLeaderStepDown, nil, time.Second)
require_NoError(t, err)

var sdr JSApiLeaderStepDownResponse
require_NoError(t, json.Unmarshal(resp.Data, &sdr))
require_True(t, sdr.Success)
require_Equal(t, sdr.Error, nil)

// Make sure we did move this time.
c.waitOnLeader()
require_NotEqual(t, ml, c.leader())
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
11 changes: 9 additions & 2 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4900,8 +4900,9 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
}
`

_, syspub := createKey(t)
sysKp, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
sysCreds := newUser(t, sysKp)

accKp, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
Expand Down Expand Up @@ -4994,6 +4995,7 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
// Move our R3 stream leader and make sure acounting is correct.
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(aExpPub, "TEST1")

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

Expand Down Expand Up @@ -5025,6 +5027,7 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(aExpPub, "TEST1")

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

Expand All @@ -5042,11 +5045,15 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

// Need system user here to move the leader.
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserCredentials(sysCreds))
defer snc.Close()

requestLeaderStepDown := func() {
ml := c.leader()
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
if cml := c.leader(); cml == ml {
nc.Request(JSApiLeaderStepDown, nil, time.Second)
snc.Request(JSApiLeaderStepDown, nil, time.Second)
return fmt.Errorf("Metaleader has not moved yet")
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3361,13 +3361,13 @@ func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) {
defer sCluster.shutdown()

requestLeaderStepDown := func(clientURL string) error {
nc, err := nats.Connect(clientURL)
nc, err := nats.Connect(clientURL, nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
return err
}
defer nc.Close()

ncResp, err := nc.Request(JSApiLeaderStepDown, nil, 3*time.Second)
ncResp, err := nc.Request(JSApiLeaderStepDown, nil, time.Second)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6542,6 +6542,7 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) {
listen: 127.0.0.1:%d
routes = [%s]
}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
storeCnf := func(serverName, clusterName, storeDir, conf string) string {
switch serverName {
Expand All @@ -6566,7 +6567,7 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) {
defer cluster.shutdown()

requestLeaderStepDown := func(clientURL string) error {
nc, err := nats.Connect(clientURL)
nc, err := nats.Connect(clientURL, nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
return err
}
Expand Down
11 changes: 9 additions & 2 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1882,7 +1882,10 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
doneCh := make(chan bool)

if sl == sc.leader() {
nc.Request(JSApiLeaderStepDown, nil, time.Second)
snc, _ := jsClientConnect(t, sc.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()
_, err := snc.Request(JSApiLeaderStepDown, nil, time.Second)
require_NoError(t, err)
sc.waitOnLeader()
}

Expand Down Expand Up @@ -10315,7 +10318,11 @@ func TestNoRaceWQAndMultiSubjectFiltersRace(t *testing.T) {
return nil
}
// Move meta-leader since stream can be R1.
nc.Request(JSApiLeaderStepDown, nil, time.Second)
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()
if _, err := snc.Request(JSApiLeaderStepDown, nil, time.Second); err != nil {
return err
}
return fmt.Errorf("stream leader on meta-leader")
})

Expand Down
19 changes: 16 additions & 3 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,15 @@ func TestNRGStepDownOnSameTermDoesntClearVote(t *testing.T) {
}

func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) {
// This test relies on nodes not hitting their election timer too often,
// otherwise the step later where we capture the election time before and
// after the failed vote request will flake.
origMinTimeout, origMaxTimeout, origHBInterval := minElectionTimeout, maxElectionTimeout, hbInterval
minElectionTimeout, maxElectionTimeout, hbInterval = time.Second*5, time.Second*10, time.Second*10
defer func() {
minElectionTimeout, maxElectionTimeout, hbInterval = origMinTimeout, origMaxTimeout, origHBInterval
}()

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
c.waitOnLeader()
Expand All @@ -379,6 +388,8 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) {
// time so that it guarantees that both the leader and the follower aren't
// operating at the time we take the etlr snapshots.
rg.lockAll()
leader.resetElect(maxElectionTimeout)
follower.resetElect(maxElectionTimeout)
leaderOriginal := leader.etlr
followerOriginal := follower.etlr
vr := &voteRequest{
Expand All @@ -404,9 +415,11 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) {
// Neither the leader nor our chosen follower should have updated their
// election timer as a result of this.
rg.lockAll()
defer rg.unlockAll()
require_True(t, leaderOriginal.Equal(leader.etlr))
require_True(t, followerOriginal.Equal(follower.etlr))
leaderEqual := leaderOriginal.Equal(leader.etlr)
followerEqual := followerOriginal.Equal(follower.etlr)
rg.unlockAll()
require_True(t, leaderEqual)
require_True(t, followerEqual)
}

func TestNRGInvalidTAVDoesntPanic(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
mset.store.FastState(&state)

// Possible race with consumer.setLeader during recovery.
mset.mu.RLock()
mset.mu.Lock()
mset.lseq = state.LastSeq
mset.mu.RUnlock()
mset.mu.Unlock()

// If no msgs (new stream), set dedupe state loaded to true.
if state.Msgs == 0 {
Expand Down