Skip to content

Commit

Permalink
[FAB-6096] Modify benchmark test code for Kafka
Browse files Browse the repository at this point in the history
1. Move to using profiles for all tests, instead of relying on just
setting the CONFIGTX_ORDERER_ORDERERTYPE ENV var.
2. Switch to 3-broker default for Kafka tests
3. Add sarama verbosity flag in envvars map for easier debugging
4. Fix comments and log statements

Change-Id: I1722a2de7fc413dff3fd877fed2e09e602078cd8
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Sep 11, 2017
1 parent 129d9e5 commit e09a94c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 30 deletions.
2 changes: 1 addition & 1 deletion orderer/common/performance/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func init() {
msp := mspmgmt.GetLocalMSP()
signer, err = msp.GetDefaultSigningIdentity()
if err != nil {
panic(fmt.Errorf("Failed to initialize get default signer: %s", err))
panic(fmt.Errorf("Failed to get default signer: %s", err))
}
}

Expand Down
51 changes: 22 additions & 29 deletions orderer/common/server/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/common/tools/configtxgen/provisional"
"github.com/hyperledger/fabric/orderer/common/localconfig"
perf "github.com/hyperledger/fabric/orderer/common/performance"
cb "github.com/hyperledger/fabric/protos/common"
Expand All @@ -26,10 +25,10 @@ import (

// Usage: BENCHMARK=true go test -run=TestOrdererBenchmark[Solo|Kafka][Broadcast|Deliver]
//
// Benchmark test makes [ch] channels, creates [bc] clients per client per orderer. There are
// [ord] orderer instances in total. A client ONLY interacts with ONE channel and ONE
// orderer, so the number of client in total is [ch * bc * ord]. Note that all clients are
// concurrent.
// Benchmark test makes [ch] channels, creates [bc] clients per client per channel per
// orderer. There are [ord] orderer instances in total. A client ONLY interacts with ONE
// channel and ONE orderer, so the number of client in total is [ch * bc * ord]. Note that
// all clients execute concurrently.
//
// The test sends [tx] transactions of size [kb] in total. These tx are evenly distributed
// among all clients, which gives us [tx / (ch * bc * ord)] tx per client.
Expand All @@ -56,7 +55,7 @@ import (
// ordered. This is important for evaluating elapsed time of async broadcast operations.
//
// Again, each deliver client only interacts with one channel and one orderer, which
// results in [a * f * e] deliver clients in total.
// results in [ch * dc * ord] deliver clients in total.
//
// ch -> channelCounts
// bc -> broadcastClientPerChannel
Expand All @@ -70,8 +69,10 @@ import (
// as deliver is effectively retrieving pre-generated blocks, so it shouldn't be choked
// by slower broadcast.
//
// Note: a Kafka broker listening on localhost:9092 is required to run Kafka based benchmark
// TODO(jay_guo) use ephemeral kafka container for test
// Note: At least three Kafka brokers listening on localhost:[9092-9094] are required to
// run the Kafka-based benchmark. This is set in the `envvars` map and can be adjusted
// if need be.
// TODO Spin up ephemeral Kafka containers for test

const (
MaxMessageCount = 10
Expand All @@ -87,10 +88,11 @@ var envvars = map[string]string{
"ORDERER_GENERAL_GENESISPROFILE": localconfig.SampleDevModeSoloProfile,
"ORDERER_GENERAL_LEDGERTYPE": "file",
"ORDERER_GENERAL_LOGLEVEL": "error",
"ORDERER_KAFKA_VERBOSE": "false",
localconfig.Prefix + "_ORDERER_BATCHSIZE_MAXMESSAGECOUNT": strconv.Itoa(MaxMessageCount),
localconfig.Prefix + "_ORDERER_BATCHSIZE_ABSOLUTEMAXBYTES": strconv.Itoa(AbsoluteMaxBytes) + " KB",
localconfig.Prefix + "_ORDERER_BATCHSIZE_PREFERREDMAXBYTES": strconv.Itoa(PreferredMaxBytes) + " KB",
localconfig.Prefix + "_ORDERER_KAFKA_BROKERS": "[localhost:9092]",
localconfig.Prefix + "_ORDERER_KAFKA_BROKERS": "[localhost:9092, localhost:9093, localhost:9094]",
}

type factors struct {
Expand Down Expand Up @@ -119,16 +121,13 @@ func (f factors) String() string {
// As benchmark tests are skipped by default, we put this test here to catch
// potential code changes that might break benchmark tests. If this test fails,
// it is likely that benchmark tests need to be updated.
func TestOrdererBenchmark(t *testing.T) {
os.Setenv(localconfig.Prefix+"_ORDERER_ORDERERTYPE", provisional.ConsensusTypeSolo)
defer os.Unsetenv(localconfig.Prefix + "_ORDERER_ORDERERTYPE")

func TestOrdererBenchmarkSolo(t *testing.T) {
for key, value := range envvars {
os.Setenv(key, value)
defer os.Unsetenv(key)
}

t.Run("Benchmark Sample Test", func(t *testing.T) {
t.Run("Benchmark Sample Test (Solo)", func(t *testing.T) {
benchmarkOrderer(t, 1, 5, PreferredMaxBytes, 1, 0, 1, true)
})
}
Expand All @@ -139,9 +138,6 @@ func TestOrdererBenchmarkSoloBroadcast(t *testing.T) {
t.Skip("Skipping benchmark test")
}

os.Setenv(localconfig.Prefix+"_ORDERER_ORDERERTYPE", provisional.ConsensusTypeSolo)
defer os.Unsetenv(localconfig.Prefix + "_ORDERER_ORDERERTYPE")

for key, value := range envvars {
os.Setenv(key, value)
defer os.Unsetenv(key)
Expand Down Expand Up @@ -187,9 +183,6 @@ func TestOrdererBenchmarkSoloDeliver(t *testing.T) {
t.Skip("Skipping benchmark test")
}

os.Setenv(localconfig.Prefix+"_ORDERER_ORDERERTYPE", provisional.ConsensusTypeSolo)
defer os.Unsetenv(localconfig.Prefix + "_ORDERER_ORDERERTYPE")

for key, value := range envvars {
os.Setenv(key, value)
defer os.Unsetenv(key)
Expand Down Expand Up @@ -235,14 +228,14 @@ func TestOrdererBenchmarkKafkaBroadcast(t *testing.T) {
t.Skip("Skipping benchmark test")
}

os.Setenv(localconfig.Prefix+"_ORDERER_ORDERERTYPE", provisional.ConsensusTypeKafka)
defer os.Unsetenv(localconfig.Prefix + "_ORDERER_ORDERERTYPE")

for key, value := range envvars {
os.Setenv(key, value)
defer os.Unsetenv(key)
}

os.Setenv("ORDERER_GENERAL_GENESISPROFILE", localconfig.SampleDevModeKafkaProfile)
defer os.Unsetenv("ORDERER_GENERAL_GENESISPROFILE")

var (
channelCounts = []int{1, 10}
totalTx = []int{10000}
Expand Down Expand Up @@ -283,14 +276,14 @@ func TestOrdererBenchmarkKafkaDeliver(t *testing.T) {
t.Skip("Skipping benchmark test")
}

os.Setenv(localconfig.Prefix+"_ORDERER_ORDERERTYPE", provisional.ConsensusTypeKafka)
defer os.Unsetenv(localconfig.Prefix + "_ORDERER_ORDERERTYPE")

for key, value := range envvars {
os.Setenv(key, value)
defer os.Unsetenv(key)
}

os.Setenv("ORDERER_GENERAL_GENESISPROFILE", localconfig.SampleDevModeKafkaProfile)
defer os.Unsetenv("ORDERER_GENERAL_GENESISPROFILE")

var (
channelCounts = []int{1, 10}
totalTx = []int{10000}
Expand Down Expand Up @@ -512,16 +505,16 @@ func benchmarkOrderer(
// Experiment shows that atomic counter is not bottleneck.
assert.Equal(t, uint64(totalTx), txCount, "Expected to send %d msg, but actually sent %d", uint64(totalTx), txCount)

ordererType := os.Getenv(localconfig.Prefix + "_ORDERER_ORDERERTYPE")
ordererProfile := os.Getenv("ORDERER_GENERAL_GENESISPROFILE")

fmt.Printf(
"Message: %6d Message Size: %3dKB Channels: %3d Orderer(%s): %2d | "+
"Messages: %6d Message Size: %3dKB Channels: %3d Orderer (%s): %2d | "+
"Broadcast Clients: %3d Write tps: %5.1f tx/s Elapsed Time: %0.2fs | "+
"Deliver clients: %3d Read tps: %8.1f blk/s Elapsed Time: %0.2fs\n",
totalTx,
msgSize,
numOfChannels,
ordererType,
ordererProfile,
numOfOrderer,
broadcastClientPerChannel*numOfChannels*numOfOrderer,
float64(totalTx)/btime.Seconds(),
Expand Down

0 comments on commit e09a94c

Please sign in to comment.