Skip to content

Commit

Permalink
xds: switching to stubserver in tests instead of testservice implemen…
Browse files Browse the repository at this point in the history
…tation (#7726)
  • Loading branch information
janardhanvissa authored Nov 13, 2024
1 parent b01130a commit 8c518f7
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 76 deletions.
62 changes: 25 additions & 37 deletions xds/internal/httpfilter/fault/fault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -67,25 +68,6 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

type testService struct {
testgrpc.TestServiceServer
}

func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}

func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
// End RPC after client does a CloseSend.
for {
if _, err := stream.Recv(); err == io.EOF {
return nil
} else if err != nil {
return err
}
}
}

// clientSetup performs a bunch of steps common to all xDS server tests here:
// - spin up an xDS management server on a local port
// - spin up a gRPC server and register the test service on it
Expand All @@ -97,7 +79,7 @@ func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSer
// sent by the xdsClient for queries.
// - the port the server is listening on
// - cleanup function to be invoked by the tests when done
func clientSetup(t *testing.T) (*e2e.ManagementServer, string, uint32, func()) {
func clientSetup(t *testing.T) (*e2e.ManagementServer, string, uint32) {
// Spin up a xDS management server on a local port.
nodeID := uuid.New().String()
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
Expand All @@ -106,25 +88,33 @@ func clientSetup(t *testing.T) (*e2e.ManagementServer, string, uint32, func()) {
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
testutils.CreateBootstrapFileForTesting(t, bootstrapContents)

// Initialize a gRPC server and register the stubServer on it.
server := grpc.NewServer()
testgrpc.RegisterTestServiceServer(server, &testService{})

// Create a local listener and pass it to Serve().
// Create a local listener.
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}

go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()

return managementServer, nodeID, uint32(lis.Addr().(*net.TCPAddr).Port), func() {
server.Stop()
// Initialize a test gRPC server, assign it to the stub server, and start the test service.
stub := &stubserver.StubServer{
Listener: lis,
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
// End RPC after client does a CloseSend.
for {
if _, err := stream.Recv(); err == io.EOF {
return nil
} else if err != nil {
return err
}
}
},
}

stubserver.StartTestService(t, stub)
t.Cleanup(stub.S.Stop)
return managementServer, nodeID, uint32(lis.Addr().(*net.TCPAddr).Port)
}

func (s) TestFaultInjection_Unary(t *testing.T) {
Expand Down Expand Up @@ -466,8 +456,7 @@ func (s) TestFaultInjection_Unary(t *testing.T) {
}},
}}

fs, nodeID, port, cleanup := clientSetup(t)
defer cleanup()
fs, nodeID, port := clientSetup(t)

for tcNum, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -549,8 +538,7 @@ func (s) TestFaultInjection_Unary(t *testing.T) {
}

func (s) TestFaultInjection_MaxActiveFaults(t *testing.T) {
fs, nodeID, port, cleanup := clientSetup(t)
defer cleanup()
fs, nodeID, port := clientSetup(t)
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: "myservice",
NodeID: nodeID,
Expand Down
82 changes: 43 additions & 39 deletions xds/server_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -66,27 +67,6 @@ const (
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

type testService struct {
testgrpc.UnimplementedTestServiceServer
}

func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}

func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
}

func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
}

func hostPortFromListener(lis net.Listener) (string, uint32, error) {
host, p, err := net.SplitHostPort(lis.Addr().String())
if err != nil {
Expand Down Expand Up @@ -145,17 +125,29 @@ func (s) TestServingModeChanges(t *testing.T) {
}
})

server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
stub := &stubserver.StubServer{
Listener: lis,
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
},
}
sopts := []grpc.ServerOption{grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)}
if stub.S, err = xds.NewGRPCServer(sopts...); err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
stubserver.StartTestService(t, stub)
defer stub.S.Stop()
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
Expand Down Expand Up @@ -271,17 +263,29 @@ func (s) TestResourceNotFoundRDS(t *testing.T) {
}
})

server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
stub := &stubserver.StubServer{
Listener: lis,
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
},
}
sopts := []grpc.ServerOption{grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)}
if stub.S, err = xds.NewGRPCServer(sopts...); err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
stubserver.StartTestService(t, stub)
defer stub.S.Stop()

cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
Expand Down

0 comments on commit 8c518f7

Please sign in to comment.