Skip to content

Commit

Permalink
[FIXED] Consumer NumPending higher than total stream messages. (#5655)
Browse files Browse the repository at this point in the history
When we matched all and had no interior deletes we would short circuit
num pending calculations but were not accounting for sseq <
state.FirstSeq.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored Jul 15, 2024
2 parents 2124d5f + 721fb55 commit b7a3df8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
4 changes: 4 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2975,6 +2975,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
return 0, validThrough
}

// If sseq is less then our first set to first.
if sseq < fs.state.FirstSeq {
sseq = fs.state.FirstSeq
}
// Track starting for both block for the sseq and staring block that matches any subject.
var seqStart int
// See if we need to figure out starting block per sseq.
Expand Down
44 changes: 44 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23763,3 +23763,47 @@ func TestJetStreamBadSubjectMappingStream(t *testing.T) {

require_Error(t, err, NewJSStreamUpdateError(errors.New("unable to get subject transform for source: invalid mapping destination: too many arguments passed to the function in {{wildcard(1)}}{{split(3,1)}}")))
}

func TestJetStreamConsumerInfoNumPending(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "LIMITS",
Subjects: []string{"js.in.limits"},
MaxMsgs: 100,
})
require_NoError(t, err)

_, err = js.AddConsumer("LIMITS", &nats.ConsumerConfig{
Name: "PULL",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

for i := 0; i < 1000; i++ {
js.Publish("js.in.limits", []byte("x"))
}

ci, err := js.ConsumerInfo("LIMITS", "PULL")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 100)

// Now restart the server.
sd := s.JetStreamConfig().StoreDir
s.Shutdown()
// Restart.
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()

nc, js = jsClientConnect(t, s)
defer nc.Close()

ci, err = js.ConsumerInfo("LIMITS", "PULL")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 100)
}

0 comments on commit b7a3df8

Please sign in to comment.