Skip to content

Commit

Permalink
grpc: add streaming metrics (#3048)
Browse files Browse the repository at this point in the history
* add streaming metrics

Signed-off-by: Kuat Yessenov <kuat@google.com>

* revert change

Signed-off-by: Kuat Yessenov <kuat@google.com>

* lint

Signed-off-by: Kuat Yessenov <kuat@google.com>

* remove extra labels

Signed-off-by: Kuat Yessenov <kuat@google.com>

* do not report 0 counters

Signed-off-by: Kuat Yessenov <kuat@google.com>

* comments

Signed-off-by: Kuat Yessenov <kuat@google.com>

* rename count to total

Signed-off-by: Kuat Yessenov <kuat@google.com>
  • Loading branch information
kyessenov authored Oct 15, 2020
1 parent f478bcf commit 3317993
Show file tree
Hide file tree
Showing 22 changed files with 730 additions and 149 deletions.
19 changes: 19 additions & 0 deletions extensions/common/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ const char kPassThroughRouteName[] = "allow_any";
const char kInboundPassthroughClusterIpv4[] = "InboundPassthroughClusterIpv4";
const char kInboundPassthroughClusterIpv6[] = "InboundPassthroughClusterIpv6";

// Well-known name for the grpc_stats filter.
constexpr std::string_view GrpcStatsName = "envoy.filters.http.grpc_stats";

namespace {

// Extract service name from service host.
Expand Down Expand Up @@ -317,6 +320,7 @@ void populateHTTPRequestInfo(bool outbound, bool use_host_header_fallback,
kContentTypeHeaderKey)
->toString()) != 0) {
request_info->request_protocol = kProtocolGRPC;
populateGRPCInfo(request_info);
} else {
// TODO Add http/1.1, http/1.0, http/2 in a separate attribute.
// http|grpc classification is compatible with Mixerclient
Expand Down Expand Up @@ -391,6 +395,21 @@ void populateTCPRequestInfo(bool outbound, RequestInfo* request_info,
request_info->request_protocol = kProtocolTCP;
}

bool populateGRPCInfo(RequestInfo* request_info) {
std::string value;
if (!getValue({"filter_state", GrpcStatsName}, &value)) {
return false;
}
// The expected byte serialization of grpc_stats filter is "x,y" where "x"
// is the request message count and "y" is the response message count.
std::vector<std::string_view> parts = absl::StrSplit(value, ',');
if (parts.size() == 2) {
return absl::SimpleAtoi(parts[0], &request_info->request_message_count) &&
absl::SimpleAtoi(parts[1], &request_info->response_message_count);
}
return false;
}

bool getAuditPolicy() {
bool shouldAudit = false;
if (!getValue<bool>(
Expand Down
8 changes: 8 additions & 0 deletions extensions/common/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ struct RequestInfo {

bool is_populated = false;
bool log_sampled = false;

// gRPC variables.
uint64_t request_message_count = 0;
uint64_t response_message_count = 0;
};

// RequestContext contains all the information available in the request.
Expand Down Expand Up @@ -231,6 +235,10 @@ void populateExtendedRequestInfo(RequestInfo* request_info);
void populateTCPRequestInfo(bool outbound, RequestInfo* request_info,
const std::string& destination_namespace);

// populateGRPCInfo fills gRPC-related information, such as message counts.
// Returns true if all information is filled.
bool populateGRPCInfo(RequestInfo* request_info);

// Read value of 'access_log_hint' key from envoy dynamic metadata which
// determines whether to audit a request or not.
bool getAuditPolicy();
Expand Down
39 changes: 26 additions & 13 deletions extensions/stats/plugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,46 +192,58 @@ const std::vector<MetricFactory>& PluginRootContext::defaultMetrics() {
// HTTP, HTTP/2, and GRPC metrics
MetricFactory{
"requests_total", MetricType::Counter,

[](const ::Wasm::Common::RequestInfo&) -> uint64_t { return 1; },
false},
false, count_standard_labels},
MetricFactory{
"request_duration_milliseconds", MetricType::Histogram,
[](const ::Wasm::Common::RequestInfo& request_info) -> uint64_t {
return request_info.duration /* in nanoseconds */ / 1000000;
},
false},
false, count_standard_labels},
MetricFactory{"request_bytes", MetricType::Histogram,

[](const ::Wasm::Common::RequestInfo& request_info)
-> uint64_t { return request_info.request_size; },
false},
false, count_standard_labels},
MetricFactory{"response_bytes", MetricType::Histogram,

[](const ::Wasm::Common::RequestInfo& request_info)
-> uint64_t { return request_info.response_size; },
false},
false, count_standard_labels},
// GRPC streaming metrics.
// These metrics are dimensioned by peer labels as a minimum.
// TODO: consider adding connection security policy
MetricFactory{
"request_messages_total", MetricType::Counter,
[](const ::Wasm::Common::RequestInfo& request_info) -> uint64_t {
return request_info.request_message_count;
},
false, count_peer_labels},
MetricFactory{
"response_messages_total", MetricType::Counter,
[](const ::Wasm::Common::RequestInfo& request_info) -> uint64_t {
return request_info.response_message_count;
},
false, count_peer_labels},
// TCP metrics.
MetricFactory{"tcp_sent_bytes_total", MetricType::Counter,
[](const ::Wasm::Common::RequestInfo& request_info)
-> uint64_t { return request_info.tcp_sent_bytes; },
true},
true, count_standard_labels},
MetricFactory{"tcp_received_bytes_total", MetricType::Counter,
[](const ::Wasm::Common::RequestInfo& request_info)
-> uint64_t { return request_info.tcp_received_bytes; },
true},
true, count_standard_labels},
MetricFactory{
"tcp_connections_opened_total", MetricType::Counter,
[](const ::Wasm::Common::RequestInfo& request_info) -> uint64_t {
return request_info.tcp_connections_opened;
},
true},
true, count_standard_labels},
MetricFactory{
"tcp_connections_closed_total", MetricType::Counter,
[](const ::Wasm::Common::RequestInfo& request_info) -> uint64_t {
return request_info.tcp_connections_closed;
},
true},
true, count_standard_labels},
};
return default_metrics;
}
Expand All @@ -252,8 +264,9 @@ bool PluginRootContext::initializeDimensions(const json& j) {
const std::vector<MetricTag>& default_tags = defaultTags();
for (const auto& factory : defaultMetrics()) {
factories[factory.name] = factory;
metric_tags[factory.name] = default_tags;
for (size_t i = 0; i < count_standard_labels; i++) {
metric_tags[factory.name] = std::vector<MetricTag>(
default_tags.begin(), default_tags.begin() + factory.count_labels);
for (size_t i = 0; i < factory.count_labels; i++) {
metric_indexes[factory.name][default_tags[i].name] = i;
}
}
Expand Down
18 changes: 14 additions & 4 deletions extensions/stats/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ STD_ISTIO_DIMENSIONS(DECLARE_CONSTANT)
const size_t count_standard_labels =
static_cast<size_t>(StandardLabels::xxx_last_metric);

const size_t count_peer_labels =
static_cast<size_t>(StandardLabels::destination_canonical_revision) + 1;

struct HashIstioDimensions {
size_t operator()(const IstioDimensions& c) const {
const size_t kMul = static_cast<size_t>(0x9ddfea08eb382d69);
Expand All @@ -115,17 +118,23 @@ using ValueExtractorFn =
// SimpleStat record a pre-resolved metric based on the values function.
class SimpleStat {
public:
SimpleStat(uint32_t metric_id, ValueExtractorFn value_fn)
: metric_id_(metric_id), value_fn_(value_fn){};
SimpleStat(uint32_t metric_id, ValueExtractorFn value_fn, MetricType type)
: metric_id_(metric_id), value_fn_(value_fn), type_(type){};

inline void record(const ::Wasm::Common::RequestInfo& request_info) {
recordMetric(metric_id_, value_fn_(request_info));
const uint64_t val = value_fn_(request_info);
// Optimization: do not record 0 COUNTER values
if (type_ == MetricType::Counter && val == 0) {
return;
}
recordMetric(metric_id_, val);
};

uint32_t metric_id_;

private:
ValueExtractorFn value_fn_;
MetricType type_;
};

// MetricFactory creates a stat generator given tags.
Expand All @@ -134,6 +143,7 @@ struct MetricFactory {
MetricType type;
ValueExtractorFn extractor;
bool is_tcp;
size_t count_labels;
};

// StatGen creates a SimpleStat based on resolved metric_id.
Expand Down Expand Up @@ -190,7 +200,7 @@ class StatGen {
}
n.append(metric_.name);
auto metric_id = metric_.resolveFullName(n);
return SimpleStat(metric_id, extractor_);
return SimpleStat(metric_id, extractor_, metric_.type);
};

private:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ require (
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d // indirect
golang.org/x/text v0.3.2 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.28.0
google.golang.org/protobuf v1.25.0 // indirect
google.golang.org/grpc v1.32.0
google.golang.org/protobuf v1.25.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v2 v2.2.8
istio.io/proxy/test/envoye2e/stackdriver_plugin/edges v0.0.0-20200916170758-74d763048fa1
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2El
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20201012135029-0c95dc0d88e8 h1:SvhzmDbMVK7pK0Fe7KMt2mHoIXxBZNfHQPRqfJFBbnY=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0 h1:AzbTB6ux+okLTzP8Ru1Xs41C303zdcfEht7MQnYJt5A=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand All @@ -171,6 +172,8 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.28.0 h1:bO/TA4OxCOummhSf10siHuG7vJOiwh7SpRpFZDkOgl4=
google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0=
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
39 changes: 39 additions & 0 deletions test/envoye2e/driver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,42 @@ func (g *GrpcCall) Run(p *Params) error {
}

func (g *GrpcCall) Cleanup() {}

var _ Step = &GrpcStream{}

type GrpcStream struct {
Counts []uint32
}

func (g *GrpcStream) Run(p *Params) error {
proxyAddr := fmt.Sprintf("127.0.0.1:%d", p.Ports.ClientPort)
conn, err := grpc.Dial(proxyAddr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return fmt.Errorf("could not establish client connection to gRPC server: %v", err)
}
defer conn.Close()
client := grpc_echo.NewEchoClient(conn)

stream, err := client.EchoStream(context.Background())
if err != nil {
return err
}

for i := 0; i < len(g.Counts); i++ {
count := g.Counts[i]
fmt.Printf("requesting %v messages at %v stream message\n", count, i)
err := stream.Send(&grpc_echo.StreamRequest{ResponseCount: count})
if err != nil {
return err
}
for j := 0; j < int(count); j++ {
_, err = stream.Recv()
if err != nil {
return err
}
}
}
return nil
}

func (g *GrpcStream) Cleanup() {}
21 changes: 21 additions & 0 deletions test/envoye2e/env/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package env
import (
"context"
"fmt"
"io"
"log"
"net"
"time"
Expand All @@ -33,6 +34,8 @@ import (
type GRPCServer struct {
port uint16
listener net.Listener

grpc_echo.UnimplementedEchoServer
}

// NewGRPCServer configures a new GRPCServer. It does not attempt to
Expand Down Expand Up @@ -75,6 +78,24 @@ func (g *GRPCServer) Echo(ctx context.Context, req *grpc_echo.EchoRequest) (*emp
return &empty.Empty{}, status.FromProto(req.ReturnStatus).Err()
}

func (g *GRPCServer) EchoStream(stream grpc_echo.Echo_EchoStreamServer) error {
var i uint32
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
for i = 0; i < req.ResponseCount; i++ {
if err = stream.Send(&empty.Empty{}); err != nil {
return err
}
}
}
}

func tryWaitForGRPCServer(addr string) error {
for i := 0; i < 10; i++ {
log.Println("Attempting to establish connection to gRPC server: ", addr)
Expand Down
Loading

0 comments on commit 3317993

Please sign in to comment.