diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index 3e4878841..8f118f931 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -740,7 +740,7 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord doReset: make(chan struct{}, 1), } if cfg.OptStartSeq != 0 { - oc.cursor.streamSeq = cfg.OptStartSeq - 1 + oc.cursor.streamSeq = cfg.OptStartSeq } err := oc.reset() if err != nil { diff --git a/jetstream/ordered.go b/jetstream/ordered.go index fd7fe2f50..18aff2afc 100644 --- a/jetstream/ordered.go +++ b/jetstream/ordered.go @@ -487,6 +487,7 @@ func (c *orderedConsumer) reset() error { break } } + seq := c.cursor.streamSeq + 1 c.cursor.deliverSeq = 0 consumerConfig := c.getConsumerConfigForSeq(seq) @@ -549,6 +550,8 @@ func (c *orderedConsumer) getConsumerConfigForSeq(seq uint64) *ConsumerConfig { c.cfg.DeliverPolicy == DeliverAllPolicy { cfg.OptStartSeq = 0 + } else { + cfg.OptStartSeq = c.cfg.OptStartSeq } if cfg.DeliverPolicy == DeliverLastPerSubjectPolicy && len(c.cfg.FilterSubjects) == 0 { diff --git a/jetstream/stream.go b/jetstream/stream.go index 01c9d58e9..2b14bc120 100644 --- a/jetstream/stream.go +++ b/jetstream/stream.go @@ -274,7 +274,7 @@ func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) doReset: make(chan struct{}, 1), } if cfg.OptStartSeq != 0 { - oc.cursor.streamSeq = cfg.OptStartSeq - 1 + oc.cursor.streamSeq = cfg.OptStartSeq } err := oc.reset() if err != nil { diff --git a/jetstream/test/ordered_test.go b/jetstream/test/ordered_test.go index c8b529f16..31747d27b 100644 --- a/jetstream/test/ordered_test.go +++ b/jetstream/test/ordered_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "reflect" "sync" "testing" "time" @@ -1548,3 +1549,138 @@ func TestOrderedConsumerNextOrder(t *testing.T) { } } } + +func TestOrderedConsumerConfig(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + tests := []struct { + name string + config jetstream.OrderedConsumerConfig + expected jetstream.ConsumerConfig + }{ + { + name: "default config", + config: jetstream.OrderedConsumerConfig{}, + expected: jetstream.ConsumerConfig{ + DeliverPolicy: jetstream.DeliverAllPolicy, + AckPolicy: jetstream.AckNonePolicy, + MaxDeliver: -1, + MaxWaiting: 512, + InactiveThreshold: 5 * time.Minute, + Replicas: 1, + MemoryStorage: true, + }, + }, + { + name: "custom inactive threshold", + config: jetstream.OrderedConsumerConfig{ + InactiveThreshold: 10 * time.Second, + }, + expected: jetstream.ConsumerConfig{ + DeliverPolicy: jetstream.DeliverAllPolicy, + AckPolicy: jetstream.AckNonePolicy, + MaxDeliver: -1, + MaxWaiting: 512, + InactiveThreshold: 10 * time.Second, + Replicas: 1, + MemoryStorage: true, + }, + }, + { + name: "custom opt start seq and inactive threshold", + config: jetstream.OrderedConsumerConfig{ + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + OptStartSeq: 10, + InactiveThreshold: 10 * time.Second, + }, + expected: jetstream.ConsumerConfig{ + OptStartSeq: 10, + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + AckPolicy: jetstream.AckNonePolicy, + MaxDeliver: -1, + MaxWaiting: 512, + InactiveThreshold: 10 * time.Second, + Replicas: 1, + MemoryStorage: true, + }, + }, + { + name: "all fields customized, start with custom seq", + config: jetstream.OrderedConsumerConfig{ + FilterSubjects: []string{"foo.a", "foo.b"}, + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + OptStartSeq: 10, + ReplayPolicy: jetstream.ReplayOriginalPolicy, + InactiveThreshold: 10 * time.Second, + HeadersOnly: true, + }, + expected: jetstream.ConsumerConfig{ + FilterSubjects: []string{"foo.a", "foo.b"}, + OptStartSeq: 10, + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + AckPolicy: jetstream.AckNonePolicy, + MaxDeliver: -1, + MaxWaiting: 512, + InactiveThreshold: 10 * time.Second, + Replicas: 1, + MemoryStorage: true, + HeadersOnly: true, + }, + }, + { + name: "all fields customized, start with custom time", + config: jetstream.OrderedConsumerConfig{ + FilterSubjects: []string{"foo.a", "foo.b"}, + DeliverPolicy: jetstream.DeliverByStartTimePolicy, + OptStartTime: &time.Time{}, + ReplayPolicy: jetstream.ReplayOriginalPolicy, + InactiveThreshold: 10 * time.Second, + HeadersOnly: true, + }, + expected: jetstream.ConsumerConfig{ + FilterSubjects: []string{"foo.a", "foo.b"}, + OptStartTime: &time.Time{}, + DeliverPolicy: jetstream.DeliverByStartTimePolicy, + AckPolicy: jetstream.AckNonePolicy, + MaxDeliver: -1, + MaxWaiting: 512, + InactiveThreshold: 10 * time.Second, + Replicas: 1, + MemoryStorage: true, + HeadersOnly: true, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c, err := s.OrderedConsumer(context.Background(), test.config) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + cfg := c.CachedInfo().Config + test.expected.Name = cfg.Name + + if !reflect.DeepEqual(test.expected, cfg) { + t.Fatalf("Expected config %+v, got %+v", test.expected, cfg) + } + }) + } +}