Skip to content

Commit

Permalink
[FIXED] Ordered Consumer for Next (#1472)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
Co-authored-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
Jarema and piotrpio authored Nov 28, 2023
1 parent bb64e1b commit 48e070d
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 6 deletions.
15 changes: 9 additions & 6 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,14 +734,9 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
defer sub.subscription.Unsubscribe()
defer close(res.msgs)
for {
if receivedMsgs == req.Batch || (req.MaxBytes != 0 && receivedBytes == req.MaxBytes) {
p.Lock()
res.done = true
p.Unlock()
return
}
select {
case msg := <-msgs:
p.Lock()
if hbTimer != nil {
hbTimer.Reset(2 * req.Heartbeat)
}
Expand All @@ -752,9 +747,11 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
res.err = err
}
res.done = true
p.Unlock()
return
}
if !userMsg {
p.Unlock()
continue
}
res.msgs <- p.jetStream.toJSMsg(msg)
Expand All @@ -767,6 +764,12 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
if req.MaxBytes != 0 {
receivedBytes += msg.Size()
}
if receivedMsgs == req.Batch || (req.MaxBytes != 0 && receivedBytes >= req.MaxBytes) {
res.done = true
p.Unlock()
return
}
p.Unlock()
case <-time.After(req.Expires + 1*time.Second):
res.done = true
return
Expand Down
60 changes: 60 additions & 0 deletions jetstream/test/ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package test
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1393,3 +1394,62 @@ func TestOrderedConsumerNextTimeout(t *testing.T) {
t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err)
}
}

func TestOrderedConsumerNextOrder(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

publishFailed := make(chan error, 1)

go func() {
for i := 0; i < 1000; i++ {
_, err := js.Publish(ctx, "FOO.A", []byte(fmt.Sprintf("%d", 1)))
if err != nil {
publishFailed <- err
}
}
}()

s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 0; i < 1000; i++ {

select {
case err := <-publishFailed:
t.Fatalf("Publish error: %v", err)
default:
}

msg, err := c.Next(jetstream.FetchMaxWait(5 * time.Second))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
meta, err := msg.Metadata()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if meta.Sequence.Stream != uint64(i+1) {
t.Fatalf("Unexpected sequence number: %d", meta.Sequence.Stream)
}
}
}

0 comments on commit 48e070d

Please sign in to comment.