Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Limit stepdown to system account. #5914

Merged
merged 3 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2810,6 +2810,12 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun
return
}

// This should only be coming from the System Account.
if acc != s.SystemAccount() {
s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
Jarema marked this conversation as resolved.
Show resolved Hide resolved
return
}

js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
return
Expand Down
5 changes: 2 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,10 +583,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
return false
}
s := js.srv
js.mu.RUnlock()

// Capture RAFT node from assignment.
node := ca.Group.node
js.mu.RUnlock()

// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
Expand Down Expand Up @@ -1039,7 +1038,7 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool {
ourID := cc.meta.ID()
for _, peer := range rg.Peers {
if peer == ourID {
if len(rg.Peers) == 1 || rg.node != nil && rg.node.Leader() {
if len(rg.Peers) == 1 || (rg.node != nil && rg.node.Leader()) {
return true
}
}
Expand Down
37 changes: 37 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6405,6 +6405,43 @@ func TestJetStreamClusterConsumerDeleteInterestPolicyPerf(t *testing.T) {
require_Equal(t, si.State.Msgs, 0)
}

// Make sure to not allow non-system accounts to move meta leader.
func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomServer()

// Client based API
nc, _ := jsClientConnect(t, s)
defer nc.Close()

ml := c.leader()

_, err := nc.Request(JSApiLeaderStepDown, nil, time.Second)
require_Error(t, err, nats.ErrTimeout)

// Make sure we did not move.
c.waitOnLeader()
require_Equal(t, ml, c.leader())

// System user can move it.
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

resp, err := snc.Request(JSApiLeaderStepDown, nil, time.Second)
require_NoError(t, err)

var sdr JSApiLeaderStepDownResponse
require_NoError(t, json.Unmarshal(resp.Data, &sdr))
require_True(t, sdr.Success)
require_Equal(t, sdr.Error, nil)

// Make sure we did move this time.
c.waitOnLeader()
require_NotEqual(t, ml, c.leader())
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
11 changes: 9 additions & 2 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4900,8 +4900,9 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
}
`

_, syspub := createKey(t)
sysKp, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
sysCreds := newUser(t, sysKp)

accKp, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
Expand Down Expand Up @@ -4994,6 +4995,7 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
// Move our R3 stream leader and make sure acounting is correct.
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(aExpPub, "TEST1")

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

Expand Down Expand Up @@ -5025,6 +5027,7 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(aExpPub, "TEST1")

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

Expand All @@ -5042,11 +5045,15 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

// Need system user here to move the leader.
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserCredentials(sysCreds))
defer snc.Close()

requestLeaderStepDown := func() {
ml := c.leader()
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
if cml := c.leader(); cml == ml {
nc.Request(JSApiLeaderStepDown, nil, time.Second)
snc.Request(JSApiLeaderStepDown, nil, time.Second)
return fmt.Errorf("Metaleader has not moved yet")
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3361,13 +3361,13 @@ func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) {
defer sCluster.shutdown()

requestLeaderStepDown := func(clientURL string) error {
nc, err := nats.Connect(clientURL)
nc, err := nats.Connect(clientURL, nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
return err
}
defer nc.Close()

ncResp, err := nc.Request(JSApiLeaderStepDown, nil, 3*time.Second)
ncResp, err := nc.Request(JSApiLeaderStepDown, nil, time.Second)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6542,6 +6542,7 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) {
listen: 127.0.0.1:%d
routes = [%s]
}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
storeCnf := func(serverName, clusterName, storeDir, conf string) string {
switch serverName {
Expand All @@ -6566,7 +6567,7 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) {
defer cluster.shutdown()

requestLeaderStepDown := func(clientURL string) error {
nc, err := nats.Connect(clientURL)
nc, err := nats.Connect(clientURL, nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
return err
}
Expand Down
11 changes: 9 additions & 2 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1882,7 +1882,10 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
doneCh := make(chan bool)

if sl == sc.leader() {
nc.Request(JSApiLeaderStepDown, nil, time.Second)
snc, _ := jsClientConnect(t, sc.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()
_, err := snc.Request(JSApiLeaderStepDown, nil, time.Second)
require_NoError(t, err)
sc.waitOnLeader()
}

Expand Down Expand Up @@ -10315,7 +10318,11 @@ func TestNoRaceWQAndMultiSubjectFiltersRace(t *testing.T) {
return nil
}
// Move meta-leader since stream can be R1.
nc.Request(JSApiLeaderStepDown, nil, time.Second)
snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()
if _, err := snc.Request(JSApiLeaderStepDown, nil, time.Second); err != nil {
return err
}
return fmt.Errorf("stream leader on meta-leader")
})

Expand Down
4 changes: 2 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,9 +727,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
mset.store.FastState(&state)

// Possible race with consumer.setLeader during recovery.
mset.mu.RLock()
mset.mu.Lock()
mset.lseq = state.LastSeq
mset.mu.RUnlock()
mset.mu.Unlock()

// If no msgs (new stream), set dedupe state loaded to true.
if state.Msgs == 0 {
Expand Down