Skip to content

Commit

Permalink
Merge pull request #1777 from CortexFoundation/dev
Browse files Browse the repository at this point in the history
event: fix Resubscribe deadlock when unsubscribing after inner sub en…
  • Loading branch information
ucwong authored Oct 25, 2023
2 parents 9d7842b + d2e481e commit 0570844
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
2 changes: 1 addition & 1 deletion event/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscriptio
backoffMax: backoffMax,
fn: fn,
err: make(chan error),
unsub: make(chan struct{}),
unsub: make(chan struct{}, 1),
}
go s.loop()
return s
Expand Down
24 changes: 24 additions & 0 deletions event/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,27 @@ func TestResubscribeWithErrorHandler(t *testing.T) {
t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs)
}
}

func TestResubscribeWithCompletedSubscription(t *testing.T) {
t.Parallel()

quitProducerAck := make(chan struct{})
quitProducer := make(chan struct{})

sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) {
return NewSubscription(func(unsubscribed <-chan struct{}) error {
select {
case <-quitProducer:
quitProducerAck <- struct{}{}
return nil
case <-unsubscribed:
return nil
}
}), nil
})

// Ensure producer has started and exited before Unsubscribe
close(quitProducer)
<-quitProducerAck
sub.Unsubscribe()
}

0 comments on commit 0570844

Please sign in to comment.