Skip to content

Commit

Permalink
Cherry-picks for 2.10.18-RC.3 (#5657)
Browse files Browse the repository at this point in the history
Includes:

- #5649
- #5651
- #5650
- #5655

---------

Signed-off-by: Derek Collison <derek@nats.io>
Signed-off-by: Waldemar Quevedo <wally@nats.io>
Co-authored-by: Derek Collison <derek@nats.io>
  • Loading branch information
wallyqs and derekcollison authored Jul 15, 2024
1 parent 0def236 commit 83e8895
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ builds:
flags:
- -trimpath
ldflags:
- -w -X github.com/nats-io/nats-server/v2/server.gitCommit={{.ShortCommit}}
- -w -X 'github.com/nats-io/nats-server/v2/server.gitCommit={{.ShortCommit}}' -X 'github.com/nats-io/nats-server/v2/server.serverVersion={{.Tag}}'
env:
- GO111MODULE=on
- CGO_ENABLED=0
Expand Down
2 changes: 1 addition & 1 deletion scripts/runTestsOnTravis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ if [ "$1" = "compile" ]; then
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.56.1;
golangci-lint run;
if [ "$TRAVIS_TAG" != "" ]; then
go test -race -v -run=TestVersionMatchesTag ./server -count=1 -vet=off
go test -race -v -run=TestVersionMatchesTag ./server -ldflags="-X=github.com/nats-io/nats-server/v2/server.serverVersion=$TRAVIS_TAG" -count=1 -vet=off
fi

elif [ "$1" = "build_only" ]; then
Expand Down
18 changes: 16 additions & 2 deletions server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"runtime/debug"
"time"
)

Expand All @@ -33,12 +34,25 @@ const (
)

var (
// gitCommit injected at build
gitCommit string
// gitCommit and serverVersion injected at build.
gitCommit, serverVersion string
// trustedKeys is a whitespace separated array of trusted operator's public nkeys.
trustedKeys string
)

func init() {
// Use build info if present, it would be if building using 'go build .'
// or when using a release.
if info, ok := debug.ReadBuildInfo(); ok {
for _, setting := range info.Settings {
switch setting.Key {
case "vcs.revision":
gitCommit = setting.Value[:7]
}
}
}
}

const (
// VERSION is the current version for the server.
VERSION = "2.10.18-RC.2"
Expand Down
4 changes: 4 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2862,6 +2862,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
return 0, validThrough
}

// If sseq is less then our first set to first.
if sseq < fs.state.FirstSeq {
sseq = fs.state.FirstSeq
}
// Track starting for both block for the sseq and staring block that matches any subject.
var seqStart int
// See if we need to figure out starting block per sseq.
Expand Down
4 changes: 1 addition & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5193,9 +5193,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
} else {
resp.ConsumerInfo = o.initialInfo()
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
if node := o.raftNode(); node != nil {
o.sendCreateAdvisory()
}
o.sendCreateAdvisory()
}

return nil
Expand Down
38 changes: 38 additions & 0 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7370,6 +7370,44 @@ func TestJetStreamClusterWorkQueueLosingMessagesOnConsumerDelete(t *testing.T) {
require_Equal(t, si.State.Msgs, 40)
}

func TestJetStreamClusterR1ConsumerAdvisory(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3F", 3)
defer c.shutdown()

s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Retention: nats.LimitsPolicy,
Replicas: 3,
})
require_NoError(t, err)

sub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.CONSUMER.CREATED.>")
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "c1",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
})
require_NoError(t, err)

checkSubsPending(t, sub, 1)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "c2",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_NoError(t, err)

checkSubsPending(t, sub, 2)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
44 changes: 44 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22758,3 +22758,47 @@ func TestJetStreamBadSubjectMappingStream(t *testing.T) {

require_Error(t, err, NewJSStreamUpdateError(errors.New("unable to get subject transform for source: invalid mapping destination: too many arguments passed to the function in {{wildcard(1)}}{{split(3,1)}}")))
}

func TestJetStreamConsumerInfoNumPending(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "LIMITS",
Subjects: []string{"js.in.limits"},
MaxMsgs: 100,
})
require_NoError(t, err)

_, err = js.AddConsumer("LIMITS", &nats.ConsumerConfig{
Name: "PULL",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

for i := 0; i < 1000; i++ {
js.Publish("js.in.limits", []byte("x"))
}

ci, err := js.ConsumerInfo("LIMITS", "PULL")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 100)

// Now restart the server.
sd := s.JetStreamConfig().StoreDir
s.Shutdown()
// Restart.
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()

nc, js = jsClientConnect(t, s)
defer nc.Close()

ci, err = js.ConsumerInfo("LIMITS", "PULL")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 100)
}
2 changes: 2 additions & 0 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
var srcUrl string
if gitCommit == _EMPTY_ {
srcUrl = "https://github.com/nats-io/nats-server"
} else if serverVersion != _EMPTY_ {
srcUrl = fmt.Sprintf("https://github.com/nats-io/nats-server/tree/%s", serverVersion)
} else {
srcUrl = fmt.Sprintf("https://github.com/nats-io/nats-server/tree/%s", gitCommit)
}
Expand Down
11 changes: 11 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ func TestVersionMatchesTag(t *testing.T) {
if VERSION != tag[1:] {
t.Fatalf("Version (%s) does not match tag (%s)", VERSION, tag[1:])
}
// Check that the version dynamically set via ldflags matches the version
// from the server previous to releasing.
if serverVersion == _EMPTY_ {
t.Fatal("Version missing in ldflags")
}
// Unlike VERSION constant, serverVersion is prefixed with a 'v'
// since it should be the same as the git tag.
expected := "v" + VERSION
if serverVersion != _EMPTY_ && expected != serverVersion {
t.Fatalf("Version (%s) does not match ldflags version (%s)", expected, serverVersion)
}
}

func TestStartProfiler(t *testing.T) {
Expand Down

0 comments on commit 83e8895

Please sign in to comment.