From 62d4521dcf6caa1a1f660b59ed4cab710f4e083d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 22 Sep 2024 12:50:28 -0700 Subject: [PATCH 1/3] Improved meta stepdown and limit to system account always. Signed-off-by: Derek Collison --- server/jetstream_api.go | 6 +++++ server/jetstream_cluster.go | 5 ++-- server/jetstream_cluster_1_test.go | 37 ++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 4b96617401b..cc6892b7dd3 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2810,6 +2810,12 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun return } + // This should only be coming from the System Account. + if acc != s.SystemAccount() { + s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User) + return + } + js, cc := s.getJetStreamCluster() if js == nil || cc == nil || cc.meta == nil { return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4b74f765a19..782188bacfb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -583,10 +583,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum return false } s := js.srv - js.mu.RUnlock() - // Capture RAFT node from assignment. node := ca.Group.node + js.mu.RUnlock() // When we try to restart we nil out the node if applicable // and reprocess the consumer assignment. @@ -1039,7 +1038,7 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool { ourID := cc.meta.ID() for _, peer := range rg.Peers { if peer == ourID { - if len(rg.Peers) == 1 || rg.node != nil && rg.node.Leader() { + if len(rg.Peers) == 1 || (rg.node != nil && rg.node.Leader()) { return true } } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 237c26db9f5..d93badc7827 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6405,6 +6405,43 @@ func TestJetStreamClusterConsumerDeleteInterestPolicyPerf(t *testing.T) { require_Equal(t, si.State.Msgs, 0) } +// Make sure to not allow non-system accounts to move meta leader. +func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + s := c.randomServer() + + // Client based API + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + ml := c.leader() + + _, err := nc.Request(JSApiLeaderStepDown, nil, time.Second) + require_Error(t, err, nats.ErrTimeout) + + // Make sure we did not move. + c.waitOnLeader() + require_Equal(t, ml, c.leader()) + + // System user can move it. + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + + resp, err := snc.Request(JSApiLeaderStepDown, nil, time.Second) + require_NoError(t, err) + + var sdr JSApiLeaderStepDownResponse + require_NoError(t, json.Unmarshal(resp.Data, &sdr)) + require_True(t, sdr.Success) + require_Equal(t, sdr.Error, nil) + + // Make sure we did move this time. + c.waitOnLeader() + require_NotEqual(t, ml, c.leader()) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. From 02dbf738e6cb021448f89f6df569d4047e27bdca Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 22 Sep 2024 13:30:09 -0700 Subject: [PATCH 2/3] Need write lock here Signed-off-by: Derek Collison --- server/stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/stream.go b/server/stream.go index aee4b347239..48ede9c6a3a 100644 --- a/server/stream.go +++ b/server/stream.go @@ -727,9 +727,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt mset.store.FastState(&state) // Possible race with consumer.setLeader during recovery. - mset.mu.RLock() + mset.mu.Lock() mset.lseq = state.LastSeq - mset.mu.RUnlock() + mset.mu.Unlock() // If no msgs (new stream), set dedupe state loaded to true. if state.Msgs == 0 { From dfa8484b6668b8c87da1cda1362a5794aa1b7761 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 22 Sep 2024 13:31:25 -0700 Subject: [PATCH 3/3] Fixup for tests Signed-off-by: Derek Collison --- server/jetstream_cluster_3_test.go | 11 +++++++++-- server/jetstream_super_cluster_test.go | 4 ++-- server/jetstream_test.go | 3 ++- server/norace_test.go | 11 +++++++++-- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 3317ccaab2c..045aa5c1c79 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4900,8 +4900,9 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) { } ` - _, syspub := createKey(t) + sysKp, syspub := createKey(t) sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + sysCreds := newUser(t, sysKp) accKp, aExpPub := createKey(t) accClaim := jwt.NewAccountClaims(aExpPub) @@ -4994,6 +4995,7 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) { // Move our R3 stream leader and make sure acounting is correct. _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second) require_NoError(t, err) + c.waitOnStreamLeader(aExpPub, "TEST1") checkAccount(sir1.State.Bytes, sir3.State.Bytes) @@ -5025,6 +5027,7 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) { Replicas: 3, }) require_NoError(t, err) + c.waitOnStreamLeader(aExpPub, "TEST1") checkAccount(sir1.State.Bytes, sir3.State.Bytes) @@ -5042,11 +5045,15 @@ func TestJetStreamClusterAccountUsageDrifts(t *testing.T) { checkAccount(sir1.State.Bytes, sir3.State.Bytes) + // Need system user here to move the leader. + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserCredentials(sysCreds)) + defer snc.Close() + requestLeaderStepDown := func() { ml := c.leader() checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { if cml := c.leader(); cml == ml { - nc.Request(JSApiLeaderStepDown, nil, time.Second) + snc.Request(JSApiLeaderStepDown, nil, time.Second) return fmt.Errorf("Metaleader has not moved yet") } return nil diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index ffd7c122818..f12eaa375d2 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -3361,13 +3361,13 @@ func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) { defer sCluster.shutdown() requestLeaderStepDown := func(clientURL string) error { - nc, err := nats.Connect(clientURL) + nc, err := nats.Connect(clientURL, nats.UserInfo("admin", "s3cr3t!")) if err != nil { return err } defer nc.Close() - ncResp, err := nc.Request(JSApiLeaderStepDown, nil, 3*time.Second) + ncResp, err := nc.Request(JSApiLeaderStepDown, nil, time.Second) if err != nil { return err } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 472cec3f453..c88e5b23684 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -6542,6 +6542,7 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) { listen: 127.0.0.1:%d routes = [%s] } + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } ` storeCnf := func(serverName, clusterName, storeDir, conf string) string { switch serverName { @@ -6566,7 +6567,7 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) { defer cluster.shutdown() requestLeaderStepDown := func(clientURL string) error { - nc, err := nats.Connect(clientURL) + nc, err := nats.Connect(clientURL, nats.UserInfo("admin", "s3cr3t!")) if err != nil { return err } diff --git a/server/norace_test.go b/server/norace_test.go index f41aee25ed8..5e01b6b9ed0 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1882,7 +1882,10 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) { doneCh := make(chan bool) if sl == sc.leader() { - nc.Request(JSApiLeaderStepDown, nil, time.Second) + snc, _ := jsClientConnect(t, sc.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + _, err := snc.Request(JSApiLeaderStepDown, nil, time.Second) + require_NoError(t, err) sc.waitOnLeader() } @@ -10315,7 +10318,11 @@ func TestNoRaceWQAndMultiSubjectFiltersRace(t *testing.T) { return nil } // Move meta-leader since stream can be R1. - nc.Request(JSApiLeaderStepDown, nil, time.Second) + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + if _, err := snc.Request(JSApiLeaderStepDown, nil, time.Second); err != nil { + return err + } return fmt.Errorf("stream leader on meta-leader") })