Skip to content

Commit

Permalink
[FIXED] Set OptStartSeq correctly in OrderedConsumerConfig (#1644)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Aug 15, 2024
1 parent b04a883 commit b1fa8c7
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 2 deletions.
2 changes: 1 addition & 1 deletion jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord
doReset: make(chan struct{}, 1),
}
if cfg.OptStartSeq != 0 {
oc.cursor.streamSeq = cfg.OptStartSeq - 1
oc.cursor.streamSeq = cfg.OptStartSeq
}
err := oc.reset()
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ func (c *orderedConsumer) reset() error {
break
}
}

seq := c.cursor.streamSeq + 1
c.cursor.deliverSeq = 0
consumerConfig := c.getConsumerConfigForSeq(seq)
Expand Down Expand Up @@ -549,6 +550,8 @@ func (c *orderedConsumer) getConsumerConfigForSeq(seq uint64) *ConsumerConfig {
c.cfg.DeliverPolicy == DeliverAllPolicy {

cfg.OptStartSeq = 0
} else {
cfg.OptStartSeq = c.cfg.OptStartSeq
}

if cfg.DeliverPolicy == DeliverLastPerSubjectPolicy && len(c.cfg.FilterSubjects) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig)
doReset: make(chan struct{}, 1),
}
if cfg.OptStartSeq != 0 {
oc.cursor.streamSeq = cfg.OptStartSeq - 1
oc.cursor.streamSeq = cfg.OptStartSeq
}
err := oc.reset()
if err != nil {
Expand Down
136 changes: 136 additions & 0 deletions jetstream/test/ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1548,3 +1549,138 @@ func TestOrderedConsumerNextOrder(t *testing.T) {
}
}
}

func TestOrderedConsumerConfig(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

tests := []struct {
name string
config jetstream.OrderedConsumerConfig
expected jetstream.ConsumerConfig
}{
{
name: "default config",
config: jetstream.OrderedConsumerConfig{},
expected: jetstream.ConsumerConfig{
DeliverPolicy: jetstream.DeliverAllPolicy,
AckPolicy: jetstream.AckNonePolicy,
MaxDeliver: -1,
MaxWaiting: 512,
InactiveThreshold: 5 * time.Minute,
Replicas: 1,
MemoryStorage: true,
},
},
{
name: "custom inactive threshold",
config: jetstream.OrderedConsumerConfig{
InactiveThreshold: 10 * time.Second,
},
expected: jetstream.ConsumerConfig{
DeliverPolicy: jetstream.DeliverAllPolicy,
AckPolicy: jetstream.AckNonePolicy,
MaxDeliver: -1,
MaxWaiting: 512,
InactiveThreshold: 10 * time.Second,
Replicas: 1,
MemoryStorage: true,
},
},
{
name: "custom opt start seq and inactive threshold",
config: jetstream.OrderedConsumerConfig{
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
OptStartSeq: 10,
InactiveThreshold: 10 * time.Second,
},
expected: jetstream.ConsumerConfig{
OptStartSeq: 10,
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
AckPolicy: jetstream.AckNonePolicy,
MaxDeliver: -1,
MaxWaiting: 512,
InactiveThreshold: 10 * time.Second,
Replicas: 1,
MemoryStorage: true,
},
},
{
name: "all fields customized, start with custom seq",
config: jetstream.OrderedConsumerConfig{
FilterSubjects: []string{"foo.a", "foo.b"},
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
OptStartSeq: 10,
ReplayPolicy: jetstream.ReplayOriginalPolicy,
InactiveThreshold: 10 * time.Second,
HeadersOnly: true,
},
expected: jetstream.ConsumerConfig{
FilterSubjects: []string{"foo.a", "foo.b"},
OptStartSeq: 10,
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
AckPolicy: jetstream.AckNonePolicy,
MaxDeliver: -1,
MaxWaiting: 512,
InactiveThreshold: 10 * time.Second,
Replicas: 1,
MemoryStorage: true,
HeadersOnly: true,
},
},
{
name: "all fields customized, start with custom time",
config: jetstream.OrderedConsumerConfig{
FilterSubjects: []string{"foo.a", "foo.b"},
DeliverPolicy: jetstream.DeliverByStartTimePolicy,
OptStartTime: &time.Time{},
ReplayPolicy: jetstream.ReplayOriginalPolicy,
InactiveThreshold: 10 * time.Second,
HeadersOnly: true,
},
expected: jetstream.ConsumerConfig{
FilterSubjects: []string{"foo.a", "foo.b"},
OptStartTime: &time.Time{},
DeliverPolicy: jetstream.DeliverByStartTimePolicy,
AckPolicy: jetstream.AckNonePolicy,
MaxDeliver: -1,
MaxWaiting: 512,
InactiveThreshold: 10 * time.Second,
Replicas: 1,
MemoryStorage: true,
HeadersOnly: true,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c, err := s.OrderedConsumer(context.Background(), test.config)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

cfg := c.CachedInfo().Config
test.expected.Name = cfg.Name

if !reflect.DeepEqual(test.expected, cfg) {
t.Fatalf("Expected config %+v, got %+v", test.expected, cfg)
}
})
}
}

0 comments on commit b1fa8c7

Please sign in to comment.