Skip to content

Commit

Permalink
interop: implement rpc-behavior for UnaryCall() (#6575)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aditya-Sood authored Sep 27, 2023
1 parent c6264a9 commit fd9ef72
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 2 deletions.
2 changes: 1 addition & 1 deletion interop/xds/custom_lb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s) TestCustomLB(t *testing.T) {
}
rpcBMD := md.Get("rpc-behavior")
if len(rpcBMD) != 1 {
errCh.Send(fmt.Errorf("received %d values for metadata key rpc-behavior, want 1", len(rpcBMD)))
errCh.Send(fmt.Errorf("received %d values for metadata key \"rpc-behavior\", want 1", len(rpcBMD)))
return &testpb.SimpleResponse{}, nil
}
wantVal := "error-code-0"
Expand Down
110 changes: 109 additions & 1 deletion interop/xds/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@ import (
"log"
"net"
"os"
"strconv"
"strings"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/admin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/health"
"google.golang.org/grpc/internal/status"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/xds"
Expand All @@ -53,6 +58,16 @@ var (
logger = grpclog.Component("interop")
)

const (
rpcBehaviorMDKey = "rpc-behavior"
grpcPreviousRPCAttemptsMDKey = "grpc-previous-rpc-attempts"
sleepPfx = "sleep-"
keepOpenVal = "keep-open"
errorCodePfx = "error-code-"
succeedOnRetryPfx = "succeed-on-retry-attempt-"
hostnamePfx = "hostname="
)

func getHostname() string {
if *hostNameOverride != "" {
return *hostNameOverride
Expand All @@ -78,8 +93,101 @@ func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*test
}

func (s *testServiceImpl) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
response := &testpb.SimpleResponse{ServerId: s.serverID, Hostname: s.hostname}

forLoop:
for _, headerVal := range getRPCBehaviorMetadata(ctx) {
// A value can have a prefix "hostname=<string>" followed by a space.
// In that case, the rest of the value should only be applied
// if the specified hostname matches the server's hostname.
if strings.HasPrefix(headerVal, hostnamePfx) {
splitVal := strings.Split(headerVal, " ")
if len(splitVal) <= 1 {
return nil, status.Errorf(codes.InvalidArgument, "invalid format for rpc-behavior header %v, must be 'hostname=<string> <header>=<value>' instead", headerVal)
}

if s.hostname != splitVal[0][len(hostnamePfx):] {
continue forLoop
}
headerVal = splitVal[1]
}

switch {
// If the value matches "sleep-<int>", the server should wait
// the specified number of seconds before resuming
// behavior matching and RPC processing.
case strings.HasPrefix(headerVal, sleepPfx):
sleep, err := strconv.Atoi(headerVal[len(sleepPfx):])
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid format for rpc-behavior header %v, must be 'sleep-<int>' instead", headerVal)
}
time.Sleep(time.Duration(sleep) * time.Second)

// If the value matches "keep-open", the server should
// never respond to the request and behavior matching ends.
case strings.HasPrefix(headerVal, keepOpenVal):
<-ctx.Done()
return nil, nil

// If the value matches "error-code-<int>", the server should
// respond with the specified status code and behavior matching ends.
case strings.HasPrefix(headerVal, errorCodePfx):
code, err := strconv.Atoi(headerVal[len(errorCodePfx):])
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid format for rpc-behavior header %v, must be 'error-code-<int>' instead", headerVal)
}
return nil, status.Errorf(codes.Code(code), "rpc failed as per the rpc-behavior header value: %v", headerVal)

// If the value matches "success-on-retry-attempt-<int>", and the
// value of the "grpc-previous-rpc-attempts" metadata field is equal to
// the specified number, the normal RPC processing should resume
// and behavior matching ends.
case strings.HasPrefix(headerVal, succeedOnRetryPfx):
wantRetry, err := strconv.Atoi(headerVal[len(succeedOnRetryPfx):])
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid format for rpc-behavior header %v, must be 'success-on-retry-attempt-<int>' instead", headerVal)
}

mdRetry := getMetadataValues(ctx, grpcPreviousRPCAttemptsMDKey)
curRetry, err := strconv.Atoi(mdRetry[0])
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid format for grpc-previous-rpc-attempts header: %v", mdRetry[0])
}

if curRetry == wantRetry {
break forLoop
}
}
}

grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
return &testpb.SimpleResponse{ServerId: s.serverID, Hostname: s.hostname}, nil
return response, status.Err(codes.OK, "")
}

func getRPCBehaviorMetadata(ctx context.Context) []string {
mdRPCBehavior := getMetadataValues(ctx, rpcBehaviorMDKey)
var rpcBehaviorMetadata []string
for _, mdVal := range mdRPCBehavior {
splitVals := strings.Split(mdVal, ",")

for _, val := range splitVals {
headerVal := strings.TrimSpace(val)
if headerVal == "" {
continue
}
rpcBehaviorMetadata = append(rpcBehaviorMetadata, headerVal)
}
}
return rpcBehaviorMetadata
}

func getMetadataValues(ctx context.Context, metadataKey string) []string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
logger.Error("Failed to retrieve metadata from incoming RPC context")
return nil
}
return md.Get(metadataKey)
}

// xdsUpdateHealthServiceImpl provides an implementation of the
Expand Down

0 comments on commit fd9ef72

Please sign in to comment.