Skip to content

Commit

Permalink
rpcclient: Add a lifetime to requests.
Browse files Browse the repository at this point in the history
Store the initial context in result types and stop waiting on context
done with ErrRequestCanceled by giving all result types a context field
and having receiveFuture stop waiting on context done with an
ErrRequestCanceled. Result types have been changed from a channel of
*response to a new cmdRes type that can hold the context.

Remove duplicate funtion futureError and unused sendCmdAndWait.
  • Loading branch information
JoeGruffins committed May 23, 2020
1 parent 2320bd2 commit 04f4888
Show file tree
Hide file tree
Showing 9 changed files with 482 additions and 482 deletions.
252 changes: 126 additions & 126 deletions rpcclient/chain.go

Large diffs are not rendered by default.

248 changes: 124 additions & 124 deletions rpcclient/extensions.go

Large diffs are not rendered by default.

54 changes: 27 additions & 27 deletions rpcclient/infrastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ var (
// client having already connected to the RPC server.
ErrClientAlreadyConnected = errors.New("websocket client has already " +
"connected")

// ErrRequestCanceled is an error to describe the condition where
// a request was canceled by the caller by terminating the passed
// context.
ErrRequestCanceled = errors.New("request was canceled by the caller")
)

const (
Expand All @@ -93,6 +98,12 @@ const (
pingInterval = time.Second * 10
)

// cmdRes holds command results.
type cmdRes struct {
ctx context.Context
c chan *response
}

// sendPostDetails houses an HTTP POST request to send to an RPC server as well
// as the original JSON-RPC command and a channel to reply on when the server
// responds with the result.
Expand Down Expand Up @@ -323,13 +334,6 @@ type response struct {
err error
}

// futureError returns a buffered response channel containing the error.
func futureError(err error) chan *response {
c := make(chan *response, 1)
c <- &response{err: err}
return c
}

// result checks whether the unmarshaled response contains a non-nil error,
// returning an unmarshaled dcrjson.RPCError (or an unmarshaling error) if so.
// If the response is not an error, the raw bytes of the request are
Expand Down Expand Up @@ -831,20 +835,25 @@ func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) {
// newFutureError returns a new future result channel that already has the
// passed error waiting on the channel with the reply set to nil. This is
// useful to easily return errors from the various Async functions.
func newFutureError(err error) chan *response {
func newFutureError(ctx context.Context, err error) *cmdRes {
responseChan := make(chan *response, 1)
responseChan <- &response{err: err}
return responseChan
return &cmdRes{ctx: ctx, c: responseChan}
}

// receiveFuture receives from the passed futureResult channel to extract a
// reply or any errors. The examined errors include an error in the
// futureResult and the error in the reply from the server. This will block
// until the result is available on the passed channel.
func receiveFuture(f chan *response) ([]byte, error) {
// Wait for a response on the returned channel.
r := <-f
return r.result, r.err
// until the result is available on the passed channel or the passed context
// is done.
func receiveFuture(ctx context.Context, f chan *response) ([]byte, error) {
// Wait for a response on the returned channel or context done.
select {
case r := <-f:
return r.result, r.err
case <-ctx.Done():
return nil, ErrRequestCanceled
}
}

// sendPost sends the passed request to the server by issuing an HTTP POST
Expand Down Expand Up @@ -913,18 +922,18 @@ func (c *Client) sendRequest(ctx context.Context, jReq *jsonRequest) {
// response channel on which the reply will be delivered at some point in the
// future. It handles both websocket and HTTP POST mode depending on the
// configuration of the client.
func (c *Client) sendCmd(ctx context.Context, cmd interface{}) chan *response {
func (c *Client) sendCmd(ctx context.Context, cmd interface{}) *cmdRes {
// Get the method associated with the command.
method, err := dcrjson.CmdMethod(cmd)
if err != nil {
return newFutureError(err)
return newFutureError(ctx, err)
}

// Marshal the command.
id := c.NextID()
marshalledJSON, err := dcrjson.MarshalCmd("1.0", id, cmd)
if err != nil {
return newFutureError(err)
return newFutureError(ctx, err)
}

// Generate the request and send it along with a channel to respond on.
Expand All @@ -938,16 +947,7 @@ func (c *Client) sendCmd(ctx context.Context, cmd interface{}) chan *response {
}
c.sendRequest(ctx, jReq)

return responseChan
}

// sendCmdAndWait sends the passed command to the associated server, waits
// for the reply, and returns the result from it. It will return the error
// field in the reply if there is one.
func (c *Client) sendCmdAndWait(ctx context.Context, cmd interface{}) (interface{}, error) {
// Marshal the command to JSON-RPC, send it to the connected server, and
// wait for a response on the returned channel.
return receiveFuture(c.sendCmd(ctx, cmd))
return &cmdRes{ctx: ctx, c: responseChan}
}

// Disconnected returns whether or not the server is disconnected. If a
Expand Down
Loading

0 comments on commit 04f4888

Please sign in to comment.