Skip to content

Commit

Permalink
Add ctx argument to receiveFuture.
Browse files Browse the repository at this point in the history
Rather than create go routines to wait and send on individual context
done, accept a context in the waiting function and return an
ErrRequestCanceled if the context is terminated.
  • Loading branch information
JoeGruffins committed May 16, 2020
1 parent 9d2ba7b commit ce45637
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 279 deletions.
136 changes: 68 additions & 68 deletions rpcclient/chain.go

Large diffs are not rendered by default.

144 changes: 72 additions & 72 deletions rpcclient/extensions.go

Large diffs are not rendered by default.

45 changes: 13 additions & 32 deletions rpcclient/infrastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,10 +845,16 @@ func newFutureError(err error) chan *response {
// receiveFuture receives from the passed futureResult channel to extract a
// reply or any errors. The examined errors include an error in the
// futureResult and the error in the reply from the server. This will block
// until the result is available on the passed channel.
func receiveFuture(f chan *response) ([]byte, error) {
// Wait for a response on the returned channel.
r := <-f
// until the result is available on the passed channel or the passed context
// is done.
func receiveFuture(ctx context.Context, f chan *response) ([]byte, error) {
// Wait for a response on the returned channel or context done.
r := new(response)
select {
case r = <-f:
case <-ctx.Done():
r.err = ErrRequestCanceled
}
return r.result, r.err
}

Expand Down Expand Up @@ -917,9 +923,7 @@ func (c *Client) sendRequest(ctx context.Context, jReq *jsonRequest) {
// sendCmd sends the passed command to the associated server and returns a
// response channel on which the reply will be delivered at some point in the
// future. It handles both websocket and HTTP POST mode depending on the
// configuration of the client. If the passed context is done before receiving
// a response ErrRequestCanceled is sent over the response channel and the
// request is removed from the request map.
// configuration of the client.
func (c *Client) sendCmd(ctx context.Context, cmd interface{}) chan *response {
// Get the method associated with the command.
method, err := dcrjson.CmdMethod(cmd)
Expand All @@ -935,36 +939,13 @@ func (c *Client) sendCmd(ctx context.Context, cmd interface{}) chan *response {
}

responseChan := make(chan *response, 1)
relayChan := make(chan *response, 1)
// Pass results along for as long as ctx is not done.
go func() {
select {
case res := <-relayChan:
responseChan <- res
case <-ctx.Done():
responseChan <- &response{err: ErrRequestCanceled}
// Remove the request from the resend queue.
var nextElem *list.Element
c.requestLock.Lock()
for e := c.requestList.Front(); e != nil; e = nextElem {
nextElem = e.Next()
jReq := e.Value.(*jsonRequest)
if jReq.id == id {
delete(c.requestMap, id)
c.requestList.Remove(e)
break
}
}
c.requestLock.Unlock()
}
}()
// Generate the request and send it along with a channel to respond on.
jReq := &jsonRequest{
id: id,
method: method,
cmd: cmd,
marshalledJSON: marshalledJSON,
responseChan: relayChan,
responseChan: responseChan,
}
c.sendRequest(ctx, jReq)

Expand All @@ -977,7 +958,7 @@ func (c *Client) sendCmd(ctx context.Context, cmd interface{}) chan *response {
func (c *Client) sendCmdAndWait(ctx context.Context, cmd interface{}) (interface{}, error) {
// Marshal the command to JSON-RPC, send it to the connected server, and
// wait for a response on the returned channel.
return receiveFuture(c.sendCmd(ctx, cmd))
return receiveFuture(ctx, c.sendCmd(ctx, cmd))
}

// Disconnected returns whether or not the server is disconnected. If a
Expand Down
64 changes: 32 additions & 32 deletions rpcclient/mining.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type FutureGenerateResult chan *response

// Receive waits for the response promised by the future and returns a list of
// block hashes generated by the call.
func (r FutureGenerateResult) Receive() ([]*chainhash.Hash, error) {
res, err := receiveFuture(r)
func (r FutureGenerateResult) Receive(ctx context.Context) ([]*chainhash.Hash, error) {
res, err := receiveFuture(ctx, r)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -60,7 +60,7 @@ func (c *Client) GenerateAsync(ctx context.Context, numBlocks uint32) FutureGene

// Generate generates numBlocks blocks and returns their hashes.
func (c *Client) Generate(ctx context.Context, numBlocks uint32) ([]*chainhash.Hash, error) {
return c.GenerateAsync(ctx, numBlocks).Receive()
return c.GenerateAsync(ctx, numBlocks).Receive(ctx)
}

// FutureGetGenerateResult is a future promise to deliver the result of a
Expand All @@ -69,8 +69,8 @@ type FutureGetGenerateResult chan *response

// Receive waits for the response promised by the future and returns true if the
// server is set to mine, otherwise false.
func (r FutureGetGenerateResult) Receive() (bool, error) {
res, err := receiveFuture(r)
func (r FutureGetGenerateResult) Receive(ctx context.Context) (bool, error) {
res, err := receiveFuture(ctx, r)
if err != nil {
return false, err
}
Expand All @@ -97,7 +97,7 @@ func (c *Client) GetGenerateAsync(ctx context.Context) FutureGetGenerateResult {

// GetGenerate returns true if the server is set to mine, otherwise false.
func (c *Client) GetGenerate(ctx context.Context) (bool, error) {
return c.GetGenerateAsync(ctx).Receive()
return c.GetGenerateAsync(ctx).Receive(ctx)
}

// FutureSetGenerateResult is a future promise to deliver the result of a
Expand All @@ -106,8 +106,8 @@ type FutureSetGenerateResult chan *response

// Receive waits for the response promised by the future and returns an error if
// any occurred when setting the server to generate coins (mine) or not.
func (r FutureSetGenerateResult) Receive() error {
_, err := receiveFuture(r)
func (r FutureSetGenerateResult) Receive(ctx context.Context) error {
_, err := receiveFuture(ctx, r)
return err
}

Expand All @@ -123,7 +123,7 @@ func (c *Client) SetGenerateAsync(ctx context.Context, enable bool, numCPUs int)

// SetGenerate sets the server to generate coins (mine) or not.
func (c *Client) SetGenerate(ctx context.Context, enable bool, numCPUs int) error {
return c.SetGenerateAsync(ctx, enable, numCPUs).Receive()
return c.SetGenerateAsync(ctx, enable, numCPUs).Receive(ctx)
}

// FutureGetHashesPerSecResult is a future promise to deliver the result of a
Expand All @@ -133,8 +133,8 @@ type FutureGetHashesPerSecResult chan *response
// Receive waits for the response promised by the future and returns a recent
// hashes per second performance measurement while generating coins (mining).
// Zero is returned if the server is not mining.
func (r FutureGetHashesPerSecResult) Receive() (int64, error) {
res, err := receiveFuture(r)
func (r FutureGetHashesPerSecResult) Receive(ctx context.Context) (int64, error) {
res, err := receiveFuture(ctx, r)
if err != nil {
return -1, err
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *Client) GetHashesPerSecAsync(ctx context.Context) FutureGetHashesPerSec
// while generating coins (mining). Zero is returned if the server is not
// mining.
func (c *Client) GetHashesPerSec(ctx context.Context) (int64, error) {
return c.GetHashesPerSecAsync(ctx).Receive()
return c.GetHashesPerSecAsync(ctx).Receive(ctx)
}

// FutureGetMiningInfoResult is a future promise to deliver the result of a
Expand All @@ -172,8 +172,8 @@ type FutureGetMiningInfoResult chan *response

// Receive waits for the response promised by the future and returns the mining
// information.
func (r FutureGetMiningInfoResult) Receive() (*chainjson.GetMiningInfoResult, error) {
res, err := receiveFuture(r)
func (r FutureGetMiningInfoResult) Receive(ctx context.Context) (*chainjson.GetMiningInfoResult, error) {
res, err := receiveFuture(ctx, r)
if err != nil {
return nil, err
}
Expand All @@ -200,7 +200,7 @@ func (c *Client) GetMiningInfoAsync(ctx context.Context) FutureGetMiningInfoResu

// GetMiningInfo returns mining information.
func (c *Client) GetMiningInfo(ctx context.Context) (*chainjson.GetMiningInfoResult, error) {
return c.GetMiningInfoAsync(ctx).Receive()
return c.GetMiningInfoAsync(ctx).Receive(ctx)
}

// FutureGetNetworkHashPS is a future promise to deliver the result of a
Expand All @@ -210,8 +210,8 @@ type FutureGetNetworkHashPS chan *response
// Receive waits for the response promised by the future and returns the
// estimated network hashes per second for the block heights provided by the
// parameters.
func (r FutureGetNetworkHashPS) Receive() (int64, error) {
res, err := receiveFuture(r)
func (r FutureGetNetworkHashPS) Receive(ctx context.Context) (int64, error) {
res, err := receiveFuture(ctx, r)
if err != nil {
return -1, err
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func (c *Client) GetNetworkHashPSAsync(ctx context.Context) FutureGetNetworkHash
// See GetNetworkHashPS2 to override the number of blocks to use and
// GetNetworkHashPS3 to override the height at which to calculate the estimate.
func (c *Client) GetNetworkHashPS(ctx context.Context) (int64, error) {
return c.GetNetworkHashPSAsync(ctx).Receive()
return c.GetNetworkHashPSAsync(ctx).Receive(ctx)
}

// GetNetworkHashPS2Async returns an instance of a type that can be used to get
Expand All @@ -263,7 +263,7 @@ func (c *Client) GetNetworkHashPS2Async(ctx context.Context, blocks int) FutureG
// See GetNetworkHashPS to use defaults and GetNetworkHashPS3 to override the
// height at which to calculate the estimate.
func (c *Client) GetNetworkHashPS2(ctx context.Context, blocks int) (int64, error) {
return c.GetNetworkHashPS2Async(ctx, blocks).Receive()
return c.GetNetworkHashPS2Async(ctx, blocks).Receive(ctx)
}

// GetNetworkHashPS3Async returns an instance of a type that can be used to get
Expand All @@ -283,7 +283,7 @@ func (c *Client) GetNetworkHashPS3Async(ctx context.Context, blocks, height int)
//
// See GetNetworkHashPS and GetNetworkHashPS2 to use defaults.
func (c *Client) GetNetworkHashPS3(ctx context.Context, blocks, height int) (int64, error) {
return c.GetNetworkHashPS3Async(ctx, blocks, height).Receive()
return c.GetNetworkHashPS3Async(ctx, blocks, height).Receive(ctx)
}

// FutureGetWork is a future promise to deliver the result of a
Expand All @@ -292,8 +292,8 @@ type FutureGetWork chan *response

// Receive waits for the response promised by the future and returns the hash
// data to work on.
func (r FutureGetWork) Receive() (*chainjson.GetWorkResult, error) {
res, err := receiveFuture(r)
func (r FutureGetWork) Receive(ctx context.Context) (*chainjson.GetWorkResult, error) {
res, err := receiveFuture(ctx, r)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func (c *Client) GetWorkAsync(ctx context.Context) FutureGetWork {
//
// See GetWorkSubmit to submit the found solution.
func (c *Client) GetWork(ctx context.Context) (*chainjson.GetWorkResult, error) {
return c.GetWorkAsync(ctx).Receive()
return c.GetWorkAsync(ctx).Receive(ctx)
}

// FutureGetWorkSubmit is a future promise to deliver the result of a
Expand All @@ -331,8 +331,8 @@ type FutureGetWorkSubmit chan *response

// Receive waits for the response promised by the future and returns whether
// or not the submitted block header was accepted.
func (r FutureGetWorkSubmit) Receive() (bool, error) {
res, err := receiveFuture(r)
func (r FutureGetWorkSubmit) Receive(ctx context.Context) (bool, error) {
res, err := receiveFuture(ctx, r)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func (c *Client) GetWorkSubmitAsync(ctx context.Context, data string) FutureGetW
//
// See GetWork to request data to work on.
func (c *Client) GetWorkSubmit(ctx context.Context, data string) (bool, error) {
return c.GetWorkSubmitAsync(ctx, data).Receive()
return c.GetWorkSubmitAsync(ctx, data).Receive(ctx)
}

// FutureSubmitBlockResult is a future promise to deliver the result of a
Expand All @@ -371,8 +371,8 @@ type FutureSubmitBlockResult chan *response

// Receive waits for the response promised by the future and returns an error if
// any occurred when submitting the block.
func (r FutureSubmitBlockResult) Receive() error {
res, err := receiveFuture(r)
func (r FutureSubmitBlockResult) Receive(ctx context.Context) error {
res, err := receiveFuture(ctx, r)
if err != nil {
return err
}
Expand Down Expand Up @@ -413,16 +413,16 @@ func (c *Client) SubmitBlockAsync(ctx context.Context, block *dcrutil.Block, opt

// SubmitBlock attempts to submit a new block into the Decred network.
func (c *Client) SubmitBlock(ctx context.Context, block *dcrutil.Block, options *chainjson.SubmitBlockOptions) error {
return c.SubmitBlockAsync(ctx, block, options).Receive()
return c.SubmitBlockAsync(ctx, block, options).Receive(ctx)
}

// FutureRegenTemplateResult is a future promise to deliver the result of a
// RegenTemplate RPC invocation (or an applicable error).
type FutureRegenTemplateResult chan *response

// Receive waits for the response and returns an error if any has occurred.
func (r FutureRegenTemplateResult) Receive() error {
_, err := receiveFuture(r)
func (r FutureRegenTemplateResult) Receive(ctx context.Context) error {
_, err := receiveFuture(ctx, r)
return err
}

Expand All @@ -440,5 +440,5 @@ func (c *Client) RegenTemplateAsync(ctx context.Context) FutureRegenTemplateResu
// that template generation is currently asynchronous, therefore no guarantees
// are made for when or whether a new template will actually be available.
func (c *Client) RegenTemplate(ctx context.Context) error {
return c.RegenTemplateAsync(ctx).Receive()
return c.RegenTemplateAsync(ctx).Receive(ctx)
}
Loading

0 comments on commit ce45637

Please sign in to comment.