Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Retry PublishAsync on no responders #1464

Merged
merged 2 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions jetstream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,16 +322,24 @@ func WithExpectLastMsgID(id string) PublishOpt {
}

// WithRetryWait sets the retry wait time when ErrNoResponders is encountered.
// Defaults to 2
Jarema marked this conversation as resolved.
Show resolved Hide resolved
func WithRetryWait(dur time.Duration) PublishOpt {
return func(opts *pubOpts) error {
if dur <= 0 {
return fmt.Errorf("%w: retry wait should be more than 0", ErrInvalidOption)
}
opts.retryWait = dur
return nil
}
}

// WithRetryAttempts sets the retry number of attempts when ErrNoResponders is encountered.
// Defaults to 250ms
Jarema marked this conversation as resolved.
Show resolved Hide resolved
func WithRetryAttempts(num int) PublishOpt {
return func(opts *pubOpts) error {
if num < 0 {
return fmt.Errorf("%w: retry attempts cannot be negative", ErrInvalidOption)
}
opts.retryAttempts = num
return nil
}
Expand Down
122 changes: 79 additions & 43 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type (

// stallWait is the max wait of a async pub ack.
stallWait time.Duration

// internal option to re-use existing paf in case of retry.
pafRetry *pubAckFuture
}

// PubAckFuture is a future for a PubAck.
Expand All @@ -67,12 +70,15 @@ type (
}

pubAckFuture struct {
jsClient *jetStreamClient
msg *nats.Msg
ack *PubAck
err error
errCh chan error
doneCh chan *PubAck
jsClient *jetStreamClient
msg *nats.Msg
retries int
maxRetries int
retryWait time.Duration
ack *PubAck
err error
errCh chan error
doneCh chan *PubAck
}

jetStreamClient struct {
Expand Down Expand Up @@ -211,7 +217,10 @@ func (js *jetStream) PublishAsync(subj string, data []byte, opts ...PublishOpt)
}

func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) {
var o pubOpts
o := pubOpts{
retryWait: DefaultPubRetryWait,
retryAttempts: DefaultPubRetryAttempts,
}
if len(opts) > 0 {
if m.Header == nil {
m.Header = nats.Header{}
Expand Down Expand Up @@ -245,30 +254,38 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}

// Reply
if m.Reply != "" {
paf := o.pafRetry
if paf == nil && m.Reply != "" {
return nil, ErrAsyncPublishReplySubjectSet
}
var err error
reply := m.Reply
m.Reply, err = js.newAsyncReply()
if err != nil {
return nil, fmt.Errorf("nats: error creating async reply handler: %s", err)
}
defer func() { m.Reply = reply }()

id := m.Reply[aReplyPreLen:]
paf := &pubAckFuture{msg: m, jsClient: js.publisher}
numPending, maxPending := js.registerPAF(id, paf)
var id string

if maxPending > 0 && numPending > maxPending {
select {
case <-js.asyncStall():
case <-time.After(stallWait):
js.clearPAF(id)
return nil, ErrTooManyStalledMsgs
// register new paf if not retrying
if paf == nil {
var err error
m.Reply, err = js.newAsyncReply()
defer func() { m.Reply = "" }()
Jarema marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("nats: error creating async reply handler: %s", err)
}
id = m.Reply[aReplyPreLen:]
paf = &pubAckFuture{msg: m, jsClient: js.publisher, maxRetries: o.retryAttempts, retryWait: o.retryWait}
numPending, maxPending := js.registerPAF(id, paf)

if maxPending > 0 && numPending > maxPending {
select {
case <-js.asyncStall():
case <-time.After(stallWait):
js.clearPAF(id)
return nil, ErrTooManyStalledMsgs
}
}
} else {
// when retrying, get the ID from existing reply subject
id = m.Reply[aReplyPreLen:]
}

if err := js.conn.PublishMsg(m); err != nil {
js.clearPAF(id)
return nil, err
Expand All @@ -277,7 +294,7 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
return paf, nil
}

// For quick token lookup etjs.
// For quick token lookup etc.
const (
aReplyPreLen = 14
aReplyTokensize = 6
Expand Down Expand Up @@ -327,26 +344,12 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) {
id := m.Subject[aReplyPreLen:]

js.publisher.Lock()

paf := js.getPAF(id)
if paf == nil {
js.publisher.Unlock()
return
}
// Remove
delete(js.publisher.acks, id)

// Check on anyone stalled and waiting.
if js.publisher.stallCh != nil && len(js.publisher.acks) < js.publisher.asyncPublisherOpts.maxpa {
close(js.publisher.stallCh)
js.publisher.stallCh = nil
}
// Check on anyone waiting on done status.
if js.publisher.doneCh != nil && len(js.publisher.acks) == 0 {
dch := js.publisher.doneCh
js.publisher.doneCh = nil
// Defer here so error is processed and can be checked.
defer close(dch)
}

doErr := func(err error) {
paf.err = err
Expand All @@ -360,12 +363,45 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) {
}
}

// Process no responders etjs.
// Process no responders etc.
if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
doErr(nats.ErrNoResponders)
if paf.retries < paf.maxRetries {
paf.retries++
paf.msg.Reply = m.Subject
time.AfterFunc(paf.retryWait, func() {
_, err := js.PublishMsgAsync(paf.msg, func(po *pubOpts) error {
po.pafRetry = paf
return nil
})
if err != nil {
js.publisher.Lock()
doErr(err)
}
})
js.publisher.Unlock()
return
}
delete(js.publisher.acks, id)
doErr(ErrNoStreamResponse)
return
}

// Remove
delete(js.publisher.acks, id)

// Check on anyone stalled and waiting.
if js.publisher.stallCh != nil && len(js.publisher.acks) < js.publisher.asyncPublisherOpts.maxpa {
close(js.publisher.stallCh)
js.publisher.stallCh = nil
}
// Check on anyone waiting on done status.
if js.publisher.doneCh != nil && len(js.publisher.acks) == 0 {
dch := js.publisher.doneCh
js.publisher.doneCh = nil
// Defer here so error is processed and can be checked.
defer close(dch)
}

var pa pubAckResponse
if err := json.Unmarshal(m.Data, &pa); err != nil {
doErr(ErrInvalidJSAck)
Expand Down
136 changes: 134 additions & 2 deletions jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,8 +1075,59 @@ func TestPublishMsgAsync(t *testing.T) {
Subject: "ABC",
},
withAckError: func(t *testing.T, err error) {
if !errors.Is(err, nats.ErrNoResponders) {
t.Fatalf("Expected error: %v; got: %v", nats.ErrNoResponders, err)
if !errors.Is(err, jetstream.ErrNoStreamResponse) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrNoStreamResponse, err)
}
},
},
},
},
{
name: "invalid retry number set",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1",
},
opts: []jetstream.PublishOpt{jetstream.WithRetryAttempts(-1)},
withPublishError: func(t *testing.T, err error) {
if !errors.Is(err, jetstream.ErrInvalidOption) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrInvalidOption, err)
}
},
},
},
},
{
name: "invalid retry wait set",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1",
},
opts: []jetstream.PublishOpt{jetstream.WithRetryWait(-1)},
withPublishError: func(t *testing.T, err error) {
if !errors.Is(err, jetstream.ErrInvalidOption) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrInvalidOption, err)
}
},
},
},
},
{
name: "invalid stall wait set",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1",
},
opts: []jetstream.PublishOpt{jetstream.WithStallWait(-1)},
withPublishError: func(t *testing.T, err error) {
if !errors.Is(err, jetstream.ErrInvalidOption) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrInvalidOption, err)
}
},
},
Expand Down Expand Up @@ -1335,3 +1386,84 @@ func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
}
}
}

func TestAsyncPublishRetry(t *testing.T) {
tests := []struct {
name string
pubOpts []jetstream.PublishOpt
ackError error
}{
{
name: "retry until stream is ready",
pubOpts: []jetstream.PublishOpt{
jetstream.WithRetryAttempts(10),
jetstream.WithRetryWait(100 * time.Millisecond),
},
},
{
name: "fail after max retries",
pubOpts: []jetstream.PublishOpt{
jetstream.WithRetryAttempts(2),
jetstream.WithRetryWait(50 * time.Millisecond),
},
ackError: jetstream.ErrNoStreamResponse,
},
{
name: "retries disabled",
pubOpts: []jetstream.PublishOpt{
jetstream.WithRetryAttempts(0),
},
ackError: jetstream.ErrNoStreamResponse,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// set max pending to 1 so that we can test if retries don't cause stall
js, err := jetstream.New(nc, jetstream.WithPublishAsyncMaxPending(1))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

test.pubOpts = append(test.pubOpts, jetstream.WithStallWait(1*time.Nanosecond))
ack, err := js.PublishAsync("foo", []byte("hello"), test.pubOpts...)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
errs := make(chan error, 1)
go func() {
// create stream with delay so that publish will receive no responders
time.Sleep(300 * time.Millisecond)
if _, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}); err != nil {
errs <- err
}
}()
select {
case <-ack.Ok():
case err := <-ack.Err():
if test.ackError != nil {
if !errors.Is(err, test.ackError) {
t.Fatalf("Expected error: %v; got: %v", test.ackError, err)
}
} else {
t.Fatalf("Unexpected ack error: %v", err)
}
case err := <-errs:
t.Fatalf("Error creating stream: %v", err)
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for ack")
}
})
}
}