Skip to content

Commit

Permalink
rpc: add support for batched requests/responses (tendermint#3534)
Browse files Browse the repository at this point in the history
Continues from tendermint#3280 in building support for batched requests/responses in the JSON RPC (as per issue tendermint#3213).

* Add JSON RPC batching for client and server

As per tendermint#3213, this adds support for [JSON RPC batch requests and
responses](https://www.jsonrpc.org/specification#batch).

* Add additional checks to ensure client responses are the same as results

* Fix case where a notification is sent and no response is expected

* Add test to check that JSON RPC notifications in a batch are left out in responses

* Update CHANGELOG_PENDING.md

* Update PR number now that PR has been created

* Make errors start with lowercase letter

* Refactor batch functionality to be standalone

This refactors the batching functionality to rather act in a standalone
way. In light of supporting concurrent goroutines making use of the same
client, it would make sense to have batching functionality where one
could create a batch of requests per goroutine and send that batch
without interfering with a batch from another goroutine.

* Add examples for simple and batch HTTP client usage

* Check errors from writer and remove nolinter directives

* Make error strings start with lowercase letter

* Refactor examples to make them testable

* Use safer deferred shutdown for example Tendermint test node

* Recompose rpcClient interface from pre-existing interface components

* Rename WaitGroup for brevity

* Replace empty ID string with request ID

* Remove extraneous test case

* Convert first letter of errors.Wrap() messages to lowercase

* Remove extraneous function parameter

* Make variable declaration terse

* Reorder WaitGroup.Done call to help prevent race conditions in the face of failure

* Swap mutex to value representation and remove initialization

* Restore empty JSONRPC string ID in response to prevent nil

* Make JSONRPCBufferedRequest private

* Revert PR hard link in CHANGELOG_PENDING

* Add client ID for JSONRPCClient

This adds code to automatically generate a randomized client ID for the
JSONRPCClient, and adds a check of the IDs in the responses (if one was
set in the requests).

* Extract response ID validation into separate function

* Remove extraneous comments

* Reorder fields to indicate clearly which are protected by the mutex

* Refactor for loop to remove indexing

* Restructure and combine loop

* Flatten conditional block for better readability

* Make multi-variable declaration slightly more readable

* Change for loop style

* Compress error check statements

* Make function description more generic to show that we support different protocols

* Preallocate memory for request and result objects
  • Loading branch information
thanethomson authored and brapse committed Jun 5, 2019
1 parent 603982c commit 429c45b
Show file tree
Hide file tree
Showing 11 changed files with 746 additions and 132 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
### FEATURES:

### IMPROVEMENTS:
- [rpc] [\#3534](https://github.com/tendermint/tendermint/pull/3534) Add support for batched requests/responses in JSON RPC

### BUG FIXES:
126 changes: 126 additions & 0 deletions rpc/client/examples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package client_test

import (
"bytes"
"fmt"

"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctest "github.com/tendermint/tendermint/rpc/test"
)

func ExampleHTTP_simple() {
// Start a tendermint node (and kvstore) in the background to test against
app := kvstore.NewKVStoreApplication()
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
defer rpctest.StopTendermint(node)

// Create our RPC client
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
c := client.NewHTTP(rpcAddr, "/websocket")

// Create a transaction
k := []byte("name")
v := []byte("satoshi")
tx := append(k, append([]byte("="), v...)...)

// Broadcast the transaction and wait for it to commit (rather use
// c.BroadcastTxSync though in production)
bres, err := c.BroadcastTxCommit(tx)
if err != nil {
panic(err)
}
if bres.CheckTx.IsErr() || bres.DeliverTx.IsErr() {
panic("BroadcastTxCommit transaction failed")
}

// Now try to fetch the value for the key
qres, err := c.ABCIQuery("/key", k)
if err != nil {
panic(err)
}
if qres.Response.IsErr() {
panic("ABCIQuery failed")
}
if !bytes.Equal(qres.Response.Key, k) {
panic("returned key does not match queried key")
}
if !bytes.Equal(qres.Response.Value, v) {
panic("returned value does not match sent value")
}

fmt.Println("Sent tx :", string(tx))
fmt.Println("Queried for :", string(qres.Response.Key))
fmt.Println("Got value :", string(qres.Response.Value))

// Output:
// Sent tx : name=satoshi
// Queried for : name
// Got value : satoshi
}

func ExampleHTTP_batching() {
// Start a tendermint node (and kvstore) in the background to test against
app := kvstore.NewKVStoreApplication()
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
defer rpctest.StopTendermint(node)

// Create our RPC client
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
c := client.NewHTTP(rpcAddr, "/websocket")

// Create our two transactions
k1 := []byte("firstName")
v1 := []byte("satoshi")
tx1 := append(k1, append([]byte("="), v1...)...)

k2 := []byte("lastName")
v2 := []byte("nakamoto")
tx2 := append(k2, append([]byte("="), v2...)...)

txs := [][]byte{tx1, tx2}

// Create a new batch
batch := c.NewBatch()

// Queue up our transactions
for _, tx := range txs {
if _, err := batch.BroadcastTxCommit(tx); err != nil {
panic(err)
}
}

// Send the batch of 2 transactions
if _, err := batch.Send(); err != nil {
panic(err)
}

// Now let's query for the original results as a batch
keys := [][]byte{k1, k2}
for _, key := range keys {
if _, err := batch.ABCIQuery("/key", key); err != nil {
panic(err)
}
}

// Send the 2 queries and keep the results
results, err := batch.Send()
if err != nil {
panic(err)
}

// Each result in the returned list is the deserialized result of each
// respective ABCIQuery response
for _, result := range results {
qr, ok := result.(*ctypes.ResultABCIQuery)
if !ok {
panic("invalid result type from ABCIQuery request")
}
fmt.Println(string(qr.Response.Key), "=", string(qr.Response.Value))
}

// Output:
// firstName = satoshi
// lastName = nakamoto
}
2 changes: 1 addition & 1 deletion rpc/client/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Waiter func(delta int64) (abort error)
// but you can plug in another one
func DefaultWaitStrategy(delta int64) (abort error) {
if delta > 10 {
return errors.Errorf("Waiting for %d blocks... aborting", delta)
return errors.Errorf("waiting for %d blocks... aborting", delta)
} else if delta > 0 {
// estimate of wait time....
// wait half a second for the next block (in progress)
Expand Down
Loading

0 comments on commit 429c45b

Please sign in to comment.