diff --git a/.goreleaser.yml b/.goreleaser.yml index 51a0d21da8b..91905885c17 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -16,7 +16,7 @@ builds: flags: - -trimpath ldflags: - - -w -X github.com/nats-io/nats-server/v2/server.gitCommit={{.ShortCommit}} + - -w -X 'github.com/nats-io/nats-server/v2/server.gitCommit={{.ShortCommit}}' -X 'github.com/nats-io/nats-server/v2/server.serverVersion={{.Tag}}' env: - GO111MODULE=on - CGO_ENABLED=0 diff --git a/scripts/runTestsOnTravis.sh b/scripts/runTestsOnTravis.sh index 84ebe2f3e35..7561bd72dda 100755 --- a/scripts/runTestsOnTravis.sh +++ b/scripts/runTestsOnTravis.sh @@ -10,7 +10,7 @@ if [ "$1" = "compile" ]; then go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.56.1; golangci-lint run; if [ "$TRAVIS_TAG" != "" ]; then - go test -race -v -run=TestVersionMatchesTag ./server -count=1 -vet=off + go test -race -v -run=TestVersionMatchesTag ./server -ldflags="-X=github.com/nats-io/nats-server/v2/server.serverVersion=$TRAVIS_TAG" -count=1 -vet=off fi elif [ "$1" = "build_only" ]; then diff --git a/server/const.go b/server/const.go index 46747187cb3..30698663808 100644 --- a/server/const.go +++ b/server/const.go @@ -14,6 +14,7 @@ package server import ( + "runtime/debug" "time" ) @@ -33,12 +34,25 @@ const ( ) var ( - // gitCommit injected at build - gitCommit string + // gitCommit and serverVersion injected at build. + gitCommit, serverVersion string // trustedKeys is a whitespace separated array of trusted operator's public nkeys. trustedKeys string ) +func init() { + // Use build info if present, it would be if building using 'go build .' + // or when using a release. + if info, ok := debug.ReadBuildInfo(); ok { + for _, setting := range info.Settings { + switch setting.Key { + case "vcs.revision": + gitCommit = setting.Value[:7] + } + } + } +} + const ( // VERSION is the current version for the server. VERSION = "2.10.18-RC.2" diff --git a/server/filestore.go b/server/filestore.go index 9fdf0d38d25..e059d28bec4 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2862,6 +2862,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return 0, validThrough } + // If sseq is less then our first set to first. + if sseq < fs.state.FirstSeq { + sseq = fs.state.FirstSeq + } // Track starting for both block for the sseq and staring block that matches any subject. var seqStart int // See if we need to figure out starting block per sseq. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 15a888147f1..78b8f9e7d5a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5193,9 +5193,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err } else { resp.ConsumerInfo = o.initialInfo() s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) - if node := o.raftNode(); node != nil { - o.sendCreateAdvisory() - } + o.sendCreateAdvisory() } return nil diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 6b4276e9e67..0388b0817f4 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -7370,6 +7370,44 @@ func TestJetStreamClusterWorkQueueLosingMessagesOnConsumerDelete(t *testing.T) { require_Equal(t, si.State.Msgs, 40) } +func TestJetStreamClusterR1ConsumerAdvisory(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Retention: nats.LimitsPolicy, + Replicas: 3, + }) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.CONSUMER.CREATED.>") + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "c1", + AckPolicy: nats.AckExplicitPolicy, + Replicas: 3, + }) + require_NoError(t, err) + + checkSubsPending(t, sub, 1) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "c2", + AckPolicy: nats.AckExplicitPolicy, + Replicas: 1, + }) + require_NoError(t, err) + + checkSubsPending(t, sub, 2) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 858d11f0120..e8ef300f8ee 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22758,3 +22758,47 @@ func TestJetStreamBadSubjectMappingStream(t *testing.T) { require_Error(t, err, NewJSStreamUpdateError(errors.New("unable to get subject transform for source: invalid mapping destination: too many arguments passed to the function in {{wildcard(1)}}{{split(3,1)}}"))) } + +func TestJetStreamConsumerInfoNumPending(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "LIMITS", + Subjects: []string{"js.in.limits"}, + MaxMsgs: 100, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("LIMITS", &nats.ConsumerConfig{ + Name: "PULL", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + for i := 0; i < 1000; i++ { + js.Publish("js.in.limits", []byte("x")) + } + + ci, err := js.ConsumerInfo("LIMITS", "PULL") + require_NoError(t, err) + require_Equal(t, ci.NumPending, 100) + + // Now restart the server. + sd := s.JetStreamConfig().StoreDir + s.Shutdown() + // Restart. + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + ci, err = js.ConsumerInfo("LIMITS", "PULL") + require_NoError(t, err) + require_Equal(t, ci.NumPending, 100) +} diff --git a/server/monitor.go b/server/monitor.go index 4eb0e74037a..b72ee09d57f 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1387,6 +1387,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { var srcUrl string if gitCommit == _EMPTY_ { srcUrl = "https://github.com/nats-io/nats-server" + } else if serverVersion != _EMPTY_ { + srcUrl = fmt.Sprintf("https://github.com/nats-io/nats-server/tree/%s", serverVersion) } else { srcUrl = fmt.Sprintf("https://github.com/nats-io/nats-server/tree/%s", gitCommit) } diff --git a/server/server_test.go b/server/server_test.go index 8226459c742..425932cf6e4 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -129,6 +129,17 @@ func TestVersionMatchesTag(t *testing.T) { if VERSION != tag[1:] { t.Fatalf("Version (%s) does not match tag (%s)", VERSION, tag[1:]) } + // Check that the version dynamically set via ldflags matches the version + // from the server previous to releasing. + if serverVersion == _EMPTY_ { + t.Fatal("Version missing in ldflags") + } + // Unlike VERSION constant, serverVersion is prefixed with a 'v' + // since it should be the same as the git tag. + expected := "v" + VERSION + if serverVersion != _EMPTY_ && expected != serverVersion { + t.Fatalf("Version (%s) does not match ldflags version (%s)", expected, serverVersion) + } } func TestStartProfiler(t *testing.T) {