Skip to content

Commit

Permalink
v1.38.x: backport (#4453)
Browse files Browse the repository at this point in the history
* interop/xds: support xds security on interop server (#4444)
* interop/xds: dockerfile for the xds interop client (#4443)
* xds/cds: add env var for aggregated and DNS cluster (#4440)
* xds: use same format while registering and watching resources (#4422)
* client: fix ForceCodec to set content-type header appropriately (#4401)
  • Loading branch information
menghanl authored May 18, 2021
1 parent 5f95ad6 commit ce3e5ec
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 31 deletions.
20 changes: 15 additions & 5 deletions internal/xds/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ const (
// and kept in variable BootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"

circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
faultInjectionSupportEnv = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"

c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
Expand All @@ -60,6 +62,7 @@ var (
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)

// CircuitBreakingSupport indicates whether circuit breaking support is
// enabled, which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "false".
Expand All @@ -71,17 +74,24 @@ var (
// FaultInjectionSupport is used to control both fault injection and HTTP
// filter support.
FaultInjectionSupport = !strings.EqualFold(os.Getenv(faultInjectionSupportEnv), "false")
// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// ClientSideSecuritySupport is used to control processing of security
// configuration on the client-side.
//
// Note that there is no env var protection for the server-side because we
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
ClientSideSecuritySupport = strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "true")
// AggregateAndDNSSupportEnv indicates whether processing of aggregated
// cluster and DNS cluster is enabled, which can be enabled by setting the
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")

// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
)
34 changes: 34 additions & 0 deletions interop/xds/client/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Dockerfile for building the xDS interop client. To build the image, run the
# following command from grpc-go directory:
# docker build -t <TAG> -f interop/xds/client/Dockerfile .

FROM golang:1.16-alpine as build

# Make a grpc-go directory and copy the repo into it.
WORKDIR /go/src/grpc-go
COPY . .

# Build a static binary without cgo so that we can copy just the binary in the
# final image, and can get rid of Go compiler and gRPC-Go dependencies.
RUN go build -tags osusergo,netgo interop/xds/client/client.go

# Second stage of the build which copies over only the client binary and skips
# the Go compiler and gRPC repo from the earlier stage. This significantly
# reduces the docker image size.
FROM alpine
COPY --from=build /go/src/grpc-go/client .
ENTRYPOINT ["./client"]
34 changes: 34 additions & 0 deletions interop/xds/server/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Dockerfile for building the xDS interop server. To build the image, run the
# following command from grpc-go directory:
# docker build -t <TAG> -f interop/xds/server/Dockerfile .

FROM golang:1.16-alpine as build

# Make a grpc-go directory and copy the repo into it.
WORKDIR /go/src/grpc-go
COPY . .

# Build a static binary without cgo so that we can copy just the binary in the
# final image, and can get rid of the Go compiler and gRPC-Go dependencies.
RUN go build -tags osusergo,netgo interop/xds/server/server.go

# Second stage of the build which copies over only the client binary and skips
# the Go compiler and gRPC repo from the earlier stage. This significantly
# reduces the docker image size.
FROM alpine
COPY --from=build /go/src/grpc-go/server .
ENTRYPOINT ["./server"]
142 changes: 124 additions & 18 deletions interop/xds/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2020 gRPC authors.
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,29 +16,37 @@
*
*/

// Binary server for xDS interop tests.
// Binary server is the server used for xDS interop tests.
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
"strconv"

"google.golang.org/grpc"
"google.golang.org/grpc/admin"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/health"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/xds"

xdscreds "google.golang.org/grpc/credentials/xds"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

var (
port = flag.Int("port", 8080, "The server port")
serverID = flag.String("server_id", "go_server", "Server ID included in response")
hostname = getHostname()
port = flag.Int("port", 8080, "Listening port for test service")
maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port")
serverID = flag.String("server_id", "go_server", "Server ID included in response")
secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")

logger = grpclog.Component("interop")
)
Expand All @@ -51,28 +59,126 @@ func getHostname() string {
return hostname
}

type server struct {
// testServiceImpl provides an implementation of the TestService defined in
// grpc.testing package.
type testServiceImpl struct {
testgrpc.UnimplementedTestServiceServer
hostname string
}

func (s *server) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
grpc.SetHeader(ctx, metadata.Pairs("hostname", hostname))
func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
return &testpb.Empty{}, nil
}

func (s *server) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
grpc.SetHeader(ctx, metadata.Pairs("hostname", hostname))
return &testpb.SimpleResponse{ServerId: *serverID, Hostname: hostname}, nil
func (s *testServiceImpl) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
return &testpb.SimpleResponse{ServerId: *serverID, Hostname: s.hostname}, nil
}

// xdsUpdateHealthServiceImpl provides an implementation of the
// XdsUpdateHealthService defined in grpc.testing package.
type xdsUpdateHealthServiceImpl struct {
testgrpc.UnimplementedXdsUpdateHealthServiceServer
healthServer *health.Server
}

func (x *xdsUpdateHealthServiceImpl) SetServing(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
x.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
return &testpb.Empty{}, nil

}

func (x *xdsUpdateHealthServiceImpl) SetNotServing(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
x.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
return &testpb.Empty{}, nil
}

func xdsServingModeCallback(addr net.Addr, args xds.ServingModeChangeArgs) {
logger.Infof("Serving mode for xDS server at %s changed to %s", addr.String(), args.Mode)
if args.Err != nil {
logger.Infof("ServingModeCallback returned error: %v", args.Err)
}
}

func main() {
flag.Parse()
p := strconv.Itoa(*port)
lis, err := net.Listen("tcp", ":"+p)

if *secureMode && *port == *maintenancePort {
logger.Fatal("-port and -maintenance_port must be different when -secure_mode is set")
}

testService := &testServiceImpl{hostname: getHostname()}
healthServer := health.NewServer()
updateHealthService := &xdsUpdateHealthServiceImpl{healthServer: healthServer}

// If -secure_mode is not set, expose all services on -port with a regular
// gRPC server.
if !*secureMode {
lis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *port))
if err != nil {
logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *port), err)
}

server := grpc.NewServer()
testgrpc.RegisterTestServiceServer(server, testService)
healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(server, healthServer)
testgrpc.RegisterXdsUpdateHealthServiceServer(server, updateHealthService)
reflection.Register(server)
cleanup, err := admin.Register(server)
if err != nil {
logger.Fatalf("Failed to register admin services: %v", err)
}
defer cleanup()
if err := server.Serve(lis); err != nil {
logger.Errorf("Serve() failed: %v", err)
}
return
}

// Create a listener on -port to expose the test service.
testLis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *port))
if err != nil {
logger.Fatalf("failed to listen: %v", err)
logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *port), err)
}

// Create server-side xDS credentials with a plaintext fallback.
creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
if err != nil {
logger.Fatalf("Failed to create xDS credentials: %v", err)
}

// Create an xDS enabled gRPC server, register the test service
// implementation and start serving.
testServer := xds.NewGRPCServer(grpc.Creds(creds), xds.ServingModeCallback(xdsServingModeCallback))
testgrpc.RegisterTestServiceServer(testServer, testService)
go func() {
if err := testServer.Serve(testLis); err != nil {
logger.Errorf("test server Serve() failed: %v", err)
}
}()
defer testServer.Stop()

// Create a listener on -maintenance_port to expose other services.
maintenanceLis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *maintenancePort))
if err != nil {
logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *maintenancePort), err)
}

// Create a regular gRPC server and register the maintenance services on
// it and start serving.
maintenanceServer := grpc.NewServer()
healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(maintenanceServer, healthServer)
testgrpc.RegisterXdsUpdateHealthServiceServer(maintenanceServer, updateHealthService)
reflection.Register(maintenanceServer)
cleanup, err := admin.Register(maintenanceServer)
if err != nil {
logger.Fatalf("Failed to register admin services: %v", err)
}
defer cleanup()
if err := maintenanceServer.Serve(maintenanceLis); err != nil {
logger.Errorf("maintenance server Serve() failed: %v", err)
}
s := grpc.NewServer()
testgrpc.RegisterTestServiceServer(s, &server{})
s.Serve(lis)
}
19 changes: 15 additions & 4 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,10 @@ func (o ContentSubtypeCallOption) before(c *callInfo) error {
}
func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {}

// ForceCodec returns a CallOption that will set codec to be
// used for all request and response messages for a call. The result of calling
// Name() will be used as the content-subtype in a case-insensitive manner.
// ForceCodec returns a CallOption that will set codec to be used for all
// request and response messages for a call. The result of calling Name() will
// be used as the content-subtype after converting to lowercase, unless
// CallContentSubtype is also used.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
Expand Down Expand Up @@ -853,7 +854,17 @@ func toRPCErr(err error) error {
// setCallInfoCodec should only be called after CallOptions have been applied.
func setCallInfoCodec(c *callInfo) error {
if c.codec != nil {
// codec was already set by a CallOption; use it.
// codec was already set by a CallOption; use it, but set the content
// subtype if it is not set.
if c.contentSubtype == "" {
// c.codec is a baseCodec to hide the difference between grpc.Codec and
// encoding.Codec (Name vs. String method name). We only support
// setting content subtype from encoding.Codec to avoid a behavior
// change with the deprecated version.
if ec, ok := c.codec.(encoding.Codec); ok {
c.contentSubtype = strings.ToLower(ec.Name())
}
}
return nil
}

Expand Down
Loading

0 comments on commit ce3e5ec

Please sign in to comment.