diff --git a/server/filestore.go b/server/filestore.go index 6e10ede8e92..b034ac77b64 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2975,6 +2975,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return 0, validThrough } + // If sseq is less then our first set to first. + if sseq < fs.state.FirstSeq { + sseq = fs.state.FirstSeq + } // Track starting for both block for the sseq and staring block that matches any subject. var seqStart int // See if we need to figure out starting block per sseq. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index d153255a525..a4559cc802e 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -23763,3 +23763,47 @@ func TestJetStreamBadSubjectMappingStream(t *testing.T) { require_Error(t, err, NewJSStreamUpdateError(errors.New("unable to get subject transform for source: invalid mapping destination: too many arguments passed to the function in {{wildcard(1)}}{{split(3,1)}}"))) } + +func TestJetStreamConsumerInfoNumPending(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "LIMITS", + Subjects: []string{"js.in.limits"}, + MaxMsgs: 100, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("LIMITS", &nats.ConsumerConfig{ + Name: "PULL", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + for i := 0; i < 1000; i++ { + js.Publish("js.in.limits", []byte("x")) + } + + ci, err := js.ConsumerInfo("LIMITS", "PULL") + require_NoError(t, err) + require_Equal(t, ci.NumPending, 100) + + // Now restart the server. + sd := s.JetStreamConfig().StoreDir + s.Shutdown() + // Restart. + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + ci, err = js.ConsumerInfo("LIMITS", "PULL") + require_NoError(t, err) + require_Equal(t, ci.NumPending, 100) +}