Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Issue #2397 #2480

Merged
merged 5 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,26 @@ const (
JsDefaultMaxAckPending = 20_000
)

// Helper function to set consumer config defaults from above.
func setConsumerConfigDefaults(config *ConsumerConfig) {
// Set to default if not specified.
if config.DeliverSubject == _EMPTY_ && config.MaxWaiting == 0 {
config.MaxWaiting = JSWaitQueueDefaultMax
}
// Setup proper default for ack wait if we are in explicit ack mode.
if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) {
config.AckWait = JsAckWaitDefault
}
// Setup default of -1, meaning no limit for MaxDeliver.
if config.MaxDeliver == 0 {
config.MaxDeliver = -1
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
config.MaxAckPending = JsDefaultMaxAckPending
}
}

func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
return mset.addConsumerWithAssignment(config, _EMPTY_, nil)
}
Expand All @@ -291,6 +311,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
return nil, NewJSConsumerConfigRequiredError()
}

// Make sure we have sane defaults.
setConsumerConfigDefaults(config)

if len(config.Description) > JSMaxDescriptionLen {
return nil, NewJSConsumerDescriptionTooLongError(JSMaxDescriptionLen)
}
Expand Down Expand Up @@ -329,10 +352,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if config.MaxWaiting < 0 {
return nil, NewJSConsumerMaxWaitingNegativeError()
}
// Set to default if not specified.
if config.MaxWaiting == 0 {
config.MaxWaiting = JSWaitQueueDefaultMax
}
if config.Heartbeat > 0 {
return nil, NewJSConsumerHBRequiresPushError()
}
Expand All @@ -354,19 +373,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
}

// Setup proper default for ack wait if we are in explicit ack mode.
if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) {
config.AckWait = JsAckWaitDefault
}
// Setup default of -1, meaning no limit for MaxDeliver.
if config.MaxDeliver == 0 {
config.MaxDeliver = -1
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
config.MaxAckPending = JsDefaultMaxAckPending
}

// As best we can make sure the filtered subject is valid.
if config.FilterSubject != _EMPTY_ {
subjects, hasExt := mset.allSubjects()
Expand Down
11 changes: 10 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3045,6 +3045,9 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj
return
}

// Make sure we have sane defaults.
setConsumerConfigDefaults(&req.Config)

// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
if req.Config.Direct {
Expand Down Expand Up @@ -3422,9 +3425,15 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
// Delaying an error response gives the leader a chance to respond before us
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group)
} else if ca != nil {
js.mu.RLock()
var node RaftNode
if rg := ca.Group; rg != nil && rg.node != nil && rg.isMember(ourID) {
node = rg.node
}
js.mu.RUnlock()
if node != nil {
// Check here if we are a member and this is just a new consumer that does not have a leader yet.
if rg.node.GroupLeader() == _EMPTY_ && !rg.node.HadPreviousLeader() {
if node.GroupLeader() == _EMPTY_ && !node.HadPreviousLeader() {
resp.Error = NewJSConsumerNotFoundError()
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
}
Expand Down
57 changes: 43 additions & 14 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool {
return true
}
}

return false
}

Expand Down Expand Up @@ -1295,17 +1296,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {

// Make sure we do not leave the apply channel to fill up and block the raft layer.
defer func() {
if n.State() != Closed {
if n.Leader() {
n.StepDown()
}
// Drain the commit channel..
for len(ach) > 0 {
select {
case <-ach:
default:
return
}
if n.State() == Closed {
return
}
if n.Leader() {
n.StepDown()
}
// Drain the commit channel..
for len(ach) > 0 {
select {
case <-ach:
default:
return
}
}
}()
Expand Down Expand Up @@ -1529,11 +1531,35 @@ func (mset *stream) resetClusteredState() bool {
js.mu.Lock()
sa.Group.node = nil
js.mu.Unlock()
go js.processClusterCreateStream(acc, sa)
go js.restartClustered(acc, sa)
}
return true
}

// This will reset the stream and consumers.
// Should be done in separate go routine.
func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) {
js.processClusterCreateStream(acc, sa)

// Check consumers.
js.mu.Lock()
var consumers []*consumerAssignment
if cc := js.cluster; cc != nil && cc.meta != nil {
ourID := cc.meta.ID()
for _, ca := range sa.consumers {
if rg := ca.Group; rg != nil && rg.isMember(ourID) {
rg.node = nil // Erase group raft/node state.
consumers = append(consumers, ca)
}
}
}
js.mu.Unlock()

for _, ca := range consumers {
js.processClusterCreateConsumer(ca, nil)
}
}

func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) error {
for _, e := range ce.Entries {
if e.Type == EntryNormal {
Expand Down Expand Up @@ -1561,7 +1587,9 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

// We can skip if we know this is less than what we already have.
if lseq < last {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
if !isRecovering {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
}
continue
}

Expand Down Expand Up @@ -4042,8 +4070,9 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
} else {
oname = cfg.Durable
if ca := sa.consumers[oname]; ca != nil && !ca.deleted {
isPull := ca.Config.DeliverSubject == _EMPTY_
// This can be ok if delivery subject update.
shouldErr := !reflect.DeepEqual(cfg, ca.Config) && !configsEqualSansDelivery(*cfg, *ca.Config) || ca.pending
shouldErr := isPull || ca.pending || (!reflect.DeepEqual(cfg, ca.Config) && !configsEqualSansDelivery(*cfg, *ca.Config))
if !shouldErr {
rr := acc.sl.Match(ca.Config.DeliverSubject)
shouldErr = len(rr.psubs)+len(rr.qsubs) != 0
Expand Down
178 changes: 177 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -7763,7 +7765,7 @@ func TestJetStreamPullConsumerLeakedSubs(t *testing.T) {
defer sub.Unsubscribe()

// Load up a bunch of requests.
numRequests := 20 //100_000
numRequests := 20
for i := 0; i < numRequests; i++ {
js.PublishAsync("Domains.Domain", []byte("QUESTION"))
}
Expand Down Expand Up @@ -8026,6 +8028,11 @@ func TestJetStreamRaceOnRAFTCreate(t *testing.T) {
t.Fatalf("Error creating stream: %v", err)
}

js, err = nc.JetStream(nats.MaxWait(time.Second))
if err != nil {
t.Fatal(err)
}

size := 10
wg := sync.WaitGroup{}
wg.Add(size)
Expand Down Expand Up @@ -8078,6 +8085,175 @@ func TestJetStreamDeadlockOnVarz(t *testing.T) {
wg.Wait()
}

// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers.
func TestJetStreamClusterStreamReset(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

numRequests := 20
for i := 0; i < numRequests; i++ {
js.Publish("foo.created", []byte("REQ"))
}

// Durable.
sub, err := js.SubscribeSync("foo.created", nats.Durable("d1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != uint64(numRequests) {
t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State)
}
// Let settle a bit.
time.Sleep(250 * time.Millisecond)

// Grab number go routines.
base := runtime.NumGoroutine()

// Grab a server that is the consumer leader for the durable.
cl := c.consumerLeader("$G", "TEST", "d1")
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do a hard reset here by hand.
mset.resetClusteredState()
// Wait til we have the leader elected.
c.waitOnConsumerLeader("$G", "TEST", "d1")

// So do not wait 10s in call in checkFor.
js2, _ := nc.JetStream(nats.MaxWait(100 * time.Millisecond))
// Make sure we can get the consumer info eventually.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
_, err := js2.ConsumerInfo("TEST", "d1")
return err
})

// Grab number go routines.
if after := runtime.NumGoroutine(); base > after {
t.Fatalf("Expected %d go routines, got %d", base, after)
}
}

// Issue #2397
func TestJetStreamClusterStreamCatchupNoState(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R2S", 2)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Hold onto servers.
sl := c.streamLeader("$G", "TEST")
if sl == nil {
t.Fatalf("Did not get a server")
}
nsl := c.randomNonStreamLeader("$G", "TEST")
if nsl == nil {
t.Fatalf("Did not get a server")
}
// Grab low level stream and raft node.
mset, err := nsl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
node := mset.raftNode()
if node == nil {
t.Fatalf("Could not get stream group name")
}
gname := node.Group()

numRequests := 100
for i := 0; i < numRequests; i++ {
// This will force a snapshot which will prune the normal log.
// We will remove the snapshot to simulate the error condition.
if i == 10 {
if err := node.InstallSnapshot(mset.stateSnapshot()); err != nil {
t.Fatalf("Error installing snapshot: %v", err)
}
}
js.Publish("foo.created", []byte("REQ"))
}

config := nsl.JetStreamConfig()
if config == nil {
t.Fatalf("No config")
}
lconfig := sl.JetStreamConfig()
if lconfig == nil {
t.Fatalf("No config")
}

nc.Close()
c.stopAll()
// Remove all state by truncating for the non-leader.
for _, fn := range []string{"1.blk", "1.idx", "1.fss"} {
fname := path.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn)
fd, err := os.OpenFile(fname, os.O_RDWR, defaultFilePerms)
if err != nil {
continue
}
fd.Truncate(0)
fd.Close()
}
// For both make sure we have no raft snapshots.
snapDir := path.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots")
os.RemoveAll(snapDir)
snapDir = path.Join(config.StoreDir, "$SYS", "_js_", gname, "snapshots")
os.RemoveAll(snapDir)

// Now restart.
c.restartAll()
for _, cs := range c.servers {
c.waitOnStreamCurrent(cs, "$G", "TEST")
}

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

if _, err := js.Publish("foo.created", []byte("REQ")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.LastSeq != 101 {
t.Fatalf("bad state after restart: %+v", si.State)
}
}

// Support functions

// Used to setup superclusters for tests.
Expand Down
Loading