Skip to content

Commit

Permalink
Merge pull request #8829 from zl03jsj/fix/TestPaychGetRestartAfterAdd…
Browse files Browse the repository at this point in the history
…FundsMsg_may_stuck_in_forever_waiting

Fix: PaychGetRestartAfterAddFundsMsg may stuck in forever waiting
  • Loading branch information
magik6k authored Jun 9, 2022
2 parents e3f1eb2 + e732e1b commit 1190050
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 4 deletions.
1 change: 1 addition & 0 deletions paychmgr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func newManager(pchstore *Store, pchapi managerAPI) (*Manager, error) {
channels: make(map[string]*channelAccessor),
pchapi: pchapi,
}
pm.ctx, pm.shutdown = context.WithCancel(context.Background())
return pm, pm.Start()
}

Expand Down
10 changes: 7 additions & 3 deletions paychmgr/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func newMockPaychAPI() *mockPaychAPI {
func (pchapi *mockPaychAPI) StateWaitMsg(ctx context.Context, mcid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) {
pchapi.lk.Lock()

response := make(chan types.MessageReceipt)
response := make(chan types.MessageReceipt, 1)

if response, ok := pchapi.waitingResponses[mcid]; ok {
defer pchapi.lk.Unlock()
Expand All @@ -151,8 +151,12 @@ func (pchapi *mockPaychAPI) StateWaitMsg(ctx context.Context, mcid cid.Cid, conf
pchapi.waitingCalls[mcid] = &waitingCall{response: response}
pchapi.lk.Unlock()

receipt := <-response
return &api.MsgLookup{Receipt: receipt}, nil
select {
case receipt := <-response:
return &api.MsgLookup{Receipt: receipt}, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (pchapi *mockPaychAPI) receiveMsgResponse(mcid cid.Cid, receipt types.MessageReceipt) {
Expand Down
2 changes: 1 addition & 1 deletion paychmgr/paychget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
require.NoError(t, err)

// Simulate shutting down system
mock.close()
require.NoError(t, mgr.Stop())

// Create a new manager with the same datastore
mock2 := newMockManagerAPI()
Expand Down
6 changes: 6 additions & 0 deletions paychmgr/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package paychmgr
import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"sync"
Expand Down Expand Up @@ -351,6 +352,11 @@ func (ca *channelAccessor) queueSize() int {
// msgWaitComplete is called when the message for a previous task is confirmed
// or there is an error.
func (ca *channelAccessor) msgWaitComplete(ctx context.Context, mcid cid.Cid, err error) {
// if context is canceled, should Not mark message to 'bad', just return.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}

ca.lk.Lock()
defer ca.lk.Unlock()

Expand Down

0 comments on commit 1190050

Please sign in to comment.