Skip to content

Commit

Permalink
chore: revise abci.Client, Async() interfaces (#169) (#226)
Browse files Browse the repository at this point in the history
* chore: revise abci.Client, Async() interfaces

* chore: regen mock w/ mockery 2.7.4

* fix: lint error

* fix: test_race

* mempool.Flush() flushes all txs from mempool so it should get `Lock()` instead of `RLock()`
  • Loading branch information
egonspace committed Jul 8, 2021
1 parent 4417941 commit e47f13d
Show file tree
Hide file tree
Showing 15 changed files with 532 additions and 604 deletions.
120 changes: 55 additions & 65 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,30 @@ const (
type Client interface {
service.Service

SetResponseCallback(Callback)
SetGlobalCallback(GlobalCallback)
GetGlobalCallback() GlobalCallback
Error() error

FlushAsync() *ReqRes
EchoAsync(msg string) *ReqRes
InfoAsync(types.RequestInfo) *ReqRes
SetOptionAsync(types.RequestSetOption) *ReqRes
DeliverTxAsync(types.RequestDeliverTx) *ReqRes
CheckTxAsync(types.RequestCheckTx) *ReqRes
QueryAsync(types.RequestQuery) *ReqRes
CommitAsync() *ReqRes
InitChainAsync(types.RequestInitChain) *ReqRes
BeginBlockAsync(types.RequestBeginBlock) *ReqRes
EndBlockAsync(types.RequestEndBlock) *ReqRes
BeginRecheckTxAsync(types.RequestBeginRecheckTx) *ReqRes
EndRecheckTxAsync(types.RequestEndRecheckTx) *ReqRes
ListSnapshotsAsync(types.RequestListSnapshots) *ReqRes
OfferSnapshotAsync(types.RequestOfferSnapshot) *ReqRes
LoadSnapshotChunkAsync(types.RequestLoadSnapshotChunk) *ReqRes
ApplySnapshotChunkAsync(types.RequestApplySnapshotChunk) *ReqRes

FlushSync() error
EchoSync(msg string) (*types.ResponseEcho, error)
FlushAsync(ResponseCallback) *ReqRes
EchoAsync(string, ResponseCallback) *ReqRes
InfoAsync(types.RequestInfo, ResponseCallback) *ReqRes
SetOptionAsync(types.RequestSetOption, ResponseCallback) *ReqRes
DeliverTxAsync(types.RequestDeliverTx, ResponseCallback) *ReqRes
CheckTxAsync(types.RequestCheckTx, ResponseCallback) *ReqRes
QueryAsync(types.RequestQuery, ResponseCallback) *ReqRes
CommitAsync(ResponseCallback) *ReqRes
InitChainAsync(types.RequestInitChain, ResponseCallback) *ReqRes
BeginBlockAsync(types.RequestBeginBlock, ResponseCallback) *ReqRes
EndBlockAsync(types.RequestEndBlock, ResponseCallback) *ReqRes
BeginRecheckTxAsync(types.RequestBeginRecheckTx, ResponseCallback) *ReqRes
EndRecheckTxAsync(types.RequestEndRecheckTx, ResponseCallback) *ReqRes
ListSnapshotsAsync(types.RequestListSnapshots, ResponseCallback) *ReqRes
OfferSnapshotAsync(types.RequestOfferSnapshot, ResponseCallback) *ReqRes
LoadSnapshotChunkAsync(types.RequestLoadSnapshotChunk, ResponseCallback) *ReqRes
ApplySnapshotChunkAsync(types.RequestApplySnapshotChunk, ResponseCallback) *ReqRes

FlushSync() (*types.ResponseFlush, error)
EchoSync(string) (*types.ResponseEcho, error)
InfoSync(types.RequestInfo) (*types.ResponseInfo, error)
SetOptionSync(types.RequestSetOption) (*types.ResponseSetOption, error)
DeliverTxSync(types.RequestDeliverTx) (*types.ResponseDeliverTx, error)
Expand Down Expand Up @@ -79,73 +80,62 @@ func NewClient(addr, transport string, mustConnect bool) (client Client, err err
return
}

type Callback func(*types.Request, *types.Response)
type GlobalCallback func(*types.Request, *types.Response)
type ResponseCallback func(*types.Response)

type ReqRes struct {
*types.Request
*sync.WaitGroup
*types.Response // Not set atomically, so be sure to use WaitGroup.

mtx tmsync.Mutex
done bool // Gets set to true once *after* WaitGroup.Done().
cb func(*types.Response) // A single callback that may be set.
wg *sync.WaitGroup
done bool // Gets set to true once *after* WaitGroup.Done().
cb ResponseCallback // A single callback that may be set.
}

func NewReqRes(req *types.Request) *ReqRes {
func NewReqRes(req *types.Request, cb ResponseCallback) *ReqRes {
return &ReqRes{
Request: req,
WaitGroup: waitGroup1(),
Response: nil,
Request: req,
Response: nil,

wg: waitGroup1(),
done: false,
cb: nil,
cb: cb,
}
}

// Sets sets the callback. If reqRes is already done, it will call the cb
// immediately. Note, reqRes.cb should not change if reqRes.done and only one
// callback is supported.
func (r *ReqRes) SetCallback(cb func(res *types.Response)) {
r.mtx.Lock()
// InvokeCallback invokes a thread-safe execution of the configured callback
// if non-nil.
func (reqRes *ReqRes) InvokeCallback() {
reqRes.mtx.Lock()
defer reqRes.mtx.Unlock()

if r.done {
r.mtx.Unlock()
cb(r.Response)
return
if reqRes.cb != nil {
reqRes.cb(reqRes.Response)
}

r.cb = cb
r.mtx.Unlock()
}

// InvokeCallback invokes a thread-safe execution of the configured callback
// if non-nil.
func (r *ReqRes) InvokeCallback() {
r.mtx.Lock()
defer r.mtx.Unlock()
func (reqRes *ReqRes) SetDone(res *types.Response) (set bool) {
reqRes.mtx.Lock()
// TODO should we panic if it's already done?
set = !reqRes.done
if set {
reqRes.Response = res
reqRes.done = true
reqRes.wg.Done()
}
reqRes.mtx.Unlock()

if r.cb != nil {
r.cb(r.Response)
// NOTE `reqRes.cb` is immutable so we're safe to access it at here without `mtx`
if set && reqRes.cb != nil {
reqRes.cb(res)
}
}

// GetCallback returns the configured callback of the ReqRes object which may be
// nil. Note, it is not safe to concurrently call this in cases where it is
// marked done and SetCallback is called before calling GetCallback as that
// will invoke the callback twice and create a potential race condition.
//
// ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.cb
return set
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
r.mtx.Lock()
r.done = true
r.mtx.Unlock()
func (reqRes *ReqRes) Wait() {
reqRes.wg.Wait()
}

func waitGroup1() (wg *sync.WaitGroup) {
Expand Down
Loading

0 comments on commit e47f13d

Please sign in to comment.