Skip to content

Commit

Permalink
interop: improve rpc_soak and channel_soak test to cover concurrency …
Browse files Browse the repository at this point in the history
…in Go (#8025)
  • Loading branch information
zbilun authored Jan 23, 2025
1 parent 8cf8fd1 commit 897818a
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 114 deletions.
33 changes: 30 additions & 3 deletions interop/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"crypto/tls"
"crypto/x509"
"flag"
"log"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -79,6 +80,7 @@ var (
soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.")
soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.")
soakNumThreads = flag.Int("soak_num_threads", 1, "The number of threads for concurrent execution of the soak tests (rpc_soak or channel_soak). The default value is set based on the interop large unary test case.")
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
additionalMetadata = flag.String("additional_metadata", "", "Additional metadata to send in each request, as a semicolon-separated list of key:value pairs.")
testCase = flag.String("test_case", "large_unary",
Expand Down Expand Up @@ -149,6 +151,21 @@ func parseAdditionalMetadataFlag() []string {
return addMd
}

// createSoakTestConfig creates a shared configuration structure for soak tests.
func createBaseSoakConfig(serverAddr string) interop.SoakTestConfig {
return interop.SoakTestConfig{
RequestSize: *soakRequestSize,
ResponseSize: *soakResponseSize,
PerIterationMaxAcceptableLatency: time.Duration(*soakPerIterationMaxAcceptableLatencyMs) * time.Millisecond,
MinTimeBetweenRPCs: time.Duration(*soakMinTimeMsBetweenRPCs) * time.Millisecond,
OverallTimeout: time.Duration(*soakOverallTimeoutSeconds) * time.Second,
ServerAddr: serverAddr,
NumWorkers: *soakNumThreads,
Iterations: *soakIterations,
MaxFailures: *soakMaxFailures,
}
}

func main() {
flag.Parse()
logger.Infof("Client running with test case %q", *testCase)
Expand Down Expand Up @@ -261,7 +278,7 @@ func main() {
}
opts = append(opts, grpc.WithUnaryInterceptor(unaryAddMd), grpc.WithStreamInterceptor(streamingAddMd))
}
conn, err := grpc.Dial(serverAddr, opts...)
conn, err := grpc.NewClient(serverAddr, opts...)
if err != nil {
logger.Fatalf("Fail to dial: %v", err)
}
Expand Down Expand Up @@ -358,10 +375,20 @@ func main() {
interop.DoPickFirstUnary(ctx, tc)
logger.Infoln("PickFirstUnary done")
case "rpc_soak":
interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
rpcSoakConfig := createBaseSoakConfig(serverAddr)
rpcSoakConfig.ChannelForTest = func() (*grpc.ClientConn, func()) { return conn, func() {} }
interop.DoSoakTest(ctxWithDeadline, rpcSoakConfig)
logger.Infoln("RpcSoak done")
case "channel_soak":
interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
channelSoakConfig := createBaseSoakConfig(serverAddr)
channelSoakConfig.ChannelForTest = func() (*grpc.ClientConn, func()) {
cc, err := grpc.NewClient(serverAddr, opts...)
if err != nil {
log.Fatalf("Failed to create shared channel: %v", err)
}
return cc, func() { cc.Close() }
}
interop.DoSoakTest(ctxWithDeadline, channelSoakConfig)
logger.Infoln("ChannelSoak done")
case "orca_per_rpc":
interop.DoORCAPerRPCTest(ctx, tc)
Expand Down
2 changes: 2 additions & 0 deletions interop/interop_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ CASES=(
"unimplemented_service"
"orca_per_rpc"
"orca_oob"
"rpc_soak"
"channel_soak"
)

# Build server
Expand Down
202 changes: 202 additions & 0 deletions interop/soak_tests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package interop

import (
"bytes"
"context"
"fmt"
"os"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/peer"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

// SoakWorkerResults stores the aggregated results for a specific worker during the soak test.
type SoakWorkerResults struct {
IterationsDone int
Failures int
Latencies *stats.Histogram
}

// SoakIterationConfig holds the parameters required for a single soak iteration.
type SoakIterationConfig struct {
RequestSize int // The size of the request payload in bytes.
ResponseSize int // The expected size of the response payload in bytes.
Client testgrpc.TestServiceClient // The gRPC client to make the call.
CallOptions []grpc.CallOption // Call options for the RPC.
}

// SoakTestConfig holds the configuration for the entire soak test.
type SoakTestConfig struct {
RequestSize int
ResponseSize int
PerIterationMaxAcceptableLatency time.Duration
MinTimeBetweenRPCs time.Duration
OverallTimeout time.Duration
ServerAddr string
NumWorkers int
Iterations int
MaxFailures int
ChannelForTest func() (*grpc.ClientConn, func())
}

func doOneSoakIteration(ctx context.Context, config SoakIterationConfig) (latency time.Duration, err error) {
start := time.Now()
// Do a large-unary RPC.
// Create the request payload.
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, config.RequestSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(config.ResponseSize),
Payload: pl,
}
// Perform the GRPC call.
var reply *testpb.SimpleResponse
reply, err = config.Client.UnaryCall(ctx, req, config.CallOptions...)
if err != nil {
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
return 0, err
}
// Validate response.
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != config.ResponseSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, config.ResponseSize)
return 0, err
}
// Calculate latency and return result.
latency = time.Since(start)
return latency, nil
}

func executeSoakTestInWorker(ctx context.Context, config SoakTestConfig, startTime time.Time, workerID int, soakWorkerResults *SoakWorkerResults) {
timeoutDuration := config.OverallTimeout
soakIterationsPerWorker := config.Iterations / config.NumWorkers
if soakWorkerResults.Latencies == nil {
soakWorkerResults.Latencies = stats.NewHistogram(stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
})
}

for i := 0; i < soakIterationsPerWorker; i++ {
if ctx.Err() != nil {
return
}
if time.Since(startTime) >= timeoutDuration {
fmt.Printf("Test exceeded overall timeout of %v, stopping...\n", config.OverallTimeout)
return
}
earliestNextStart := time.After(config.MinTimeBetweenRPCs)
currentChannel, cleanup := config.ChannelForTest()
defer cleanup()
client := testgrpc.NewTestServiceClient(currentChannel)
var p peer.Peer
iterationConfig := SoakIterationConfig{
RequestSize: config.RequestSize,
ResponseSize: config.ResponseSize,
Client: client,
CallOptions: []grpc.CallOption{grpc.Peer(&p)},
}
latency, err := doOneSoakIteration(ctx, iterationConfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s failed: %s\n", workerID, i, 0, p.Addr, config.ServerAddr, err)
soakWorkerResults.Failures++
<-earliestNextStart
continue
}
if latency > config.PerIterationMaxAcceptableLatency {
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s exceeds max acceptable latency: %d\n", workerID, i, latency, p.Addr, config.ServerAddr, config.PerIterationMaxAcceptableLatency.Milliseconds())
soakWorkerResults.Failures++
<-earliestNextStart
continue
}
// Success: log the details of the iteration.
soakWorkerResults.Latencies.Add(latency.Milliseconds())
soakWorkerResults.IterationsDone++
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s succeeded\n", workerID, i, latency, p.Addr, config.ServerAddr)
<-earliestNextStart
}
}

// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
func DoSoakTest(ctx context.Context, soakConfig SoakTestConfig) {
if soakConfig.Iterations%soakConfig.NumWorkers != 0 {
fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by soakNumWThreads\n")
}
startTime := time.Now()
var wg sync.WaitGroup
soakWorkerResults := make([]SoakWorkerResults, soakConfig.NumWorkers)
for i := 0; i < soakConfig.NumWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
executeSoakTestInWorker(ctx, soakConfig, startTime, workerID, &soakWorkerResults[workerID])
}(i)
}
// Wait for all goroutines to complete.
wg.Wait()

//Handle results.
totalIterations := 0
totalFailures := 0
latencies := stats.NewHistogram(stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
})
for _, worker := range soakWorkerResults {
totalIterations += worker.IterationsDone
totalFailures += worker.Failures
if worker.Latencies != nil {
// Add latencies from the worker's Histogram to the main latencies.
latencies.Merge(worker.Latencies)
}
}
var b bytes.Buffer
latencies.Print(&b)
fmt.Fprintf(os.Stderr,
"(server_uri: %s) soak test ran: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n",
soakConfig.ServerAddr, totalIterations, soakConfig.Iterations, totalFailures, b.String())

if totalIterations != soakConfig.Iterations {
fmt.Fprintf(os.Stderr, "Soak test consumed all %v of time and quit early, ran %d out of %d iterations.\n", soakConfig.OverallTimeout, totalIterations, soakConfig.Iterations)
}

if totalFailures > soakConfig.MaxFailures {
fmt.Fprintf(os.Stderr, "Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, soakConfig.MaxFailures)
}
if soakConfig.ChannelForTest != nil {
_, cleanup := soakConfig.ChannelForTest()
defer cleanup()
}
}
Loading

0 comments on commit 897818a

Please sign in to comment.