Skip to content

Commit

Permalink
feat(tracemode): Tracemode v1 (#4030)
Browse files Browse the repository at this point in the history
* feat(tracemode): Tracemode v1

* updates to trace mode

* updates to trace mode

* removing code

* cleanup
  • Loading branch information
xoscar authored Sep 30, 2024
1 parent 646ea05 commit bf6f525
Show file tree
Hide file tree
Showing 35 changed files with 4,311 additions and 790 deletions.
23 changes: 23 additions & 0 deletions agent/client/workflow_send_trace_mode_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package client

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
)

func (c *Client) SendTraces(ctx context.Context, response *proto.ExportRequest) error {
client := proto.NewOrchestratorClient(c.conn)

response.AgentIdentification = c.sessionConfig.AgentIdentification
response.Metadata = telemetry.ExtractMetadataFromContext(ctx)

_, err := client.Export(ctx, response)
if err != nil {
return fmt.Errorf("could not send list traces result request: %w", err)
}

return nil
}
14 changes: 13 additions & 1 deletion agent/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func WithTraceCache(traceCache TraceCache) CollectorOption {
}
}

func WithTraceMode(traceMode bool) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.traceMode = traceMode
}
}

func WithStartRemoteServer(startRemoteServer bool) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.startRemoteServer = startRemoteServer
Expand All @@ -57,6 +63,12 @@ func WithSensor(sensor sensors.Sensor) CollectorOption {
}
}

func WithTraceModeForwarder(traceModeForwarder TraceModeForwarder) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.traceModeForwarder = traceModeForwarder
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
Expand Down Expand Up @@ -108,7 +120,7 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...Coll
opt(&ingesterConfig)
}

ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig, ingesterConfig.startRemoteServer)
ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig, ingesterConfig.traceModeForwarder, ingesterConfig.startRemoteServer)
if err != nil {
return nil, fmt.Errorf("could not start local collector: %w", err)
}
Expand Down
77 changes: 54 additions & 23 deletions agent/collector/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"github.com/kubeshop/tracetest/agent/ui/dashboard/events"
"github.com/kubeshop/tracetest/agent/ui/dashboard/sensors"
"github.com/kubeshop/tracetest/server/otlp"
"github.com/kubeshop/tracetest/server/traces"
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v11 "go.opentelemetry.io/proto/otlp/common/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/zap"
)
Expand All @@ -29,15 +31,20 @@ type ingester interface {
SetSensor(sensors.Sensor)
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, startRemoteServer bool) (ingester, error) {
type TraceModeForwarder interface {
Export(ctx context.Context, request *pb.ExportTraceServiceRequest) error
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, traceModeForwarder TraceModeForwarder, startRemoteServer bool) (ingester, error) {
ingester := &forwardIngester{
BatchTimeout: batchTimeout,
RemoteIngester: cfg,
traceIDs: make(map[string]bool, 0),
done: make(chan bool),
traceCache: cfg.traceCache,
logger: cfg.logger,
sensor: cfg.sensor,
BatchTimeout: batchTimeout,
RemoteIngester: cfg,
traceIDs: make(map[string]bool, 0),
done: make(chan bool),
traceCache: cfg.traceCache,
logger: cfg.logger,
sensor: cfg.sensor,
traceModeForwarder: traceModeForwarder,
}

return ingester, nil
Expand All @@ -51,28 +58,31 @@ type Statistics struct {
// forwardIngester forwards all incoming spans to a remote ingester. It also batches those
// spans to reduce network traffic.
type forwardIngester struct {
BatchTimeout time.Duration
RemoteIngester remoteIngesterConfig
mutex sync.Mutex
traceIDs map[string]bool
done chan bool
traceCache TraceCache
logger *zap.Logger
sensor sensors.Sensor
BatchTimeout time.Duration
RemoteIngester remoteIngesterConfig
mutex sync.Mutex
traceIDs map[string]bool
done chan bool
traceCache TraceCache
logger *zap.Logger
sensor sensors.Sensor
traceModeForwarder TraceModeForwarder

statistics Statistics

sync.Mutex
}

type remoteIngesterConfig struct {
URL string
Token string
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
observer event.Observer
sensor sensors.Sensor
URL string
Token string
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
observer event.Observer
sensor sensors.Sensor
traceMode bool
traceModeForwarder TraceModeForwarder
}

func (i *forwardIngester) Statistics() Statistics {
Expand All @@ -89,6 +99,12 @@ func (i *forwardIngester) SetSensor(sensor sensors.Sensor) {

func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest, requestType otlp.RequestType) (*pb.ExportTraceServiceResponse, error) {
go i.ingestSpans(request)
if i.RemoteIngester.traceMode {
err := i.traceModeForwarder.Export(ctx, request)
if err != nil {
i.logger.Error("failed to forward spans to trace mode", zap.Error(err))
}
}

return &pb.ExportTraceServiceResponse{
PartialSuccess: &pb.ExportTracePartialSuccess{
Expand Down Expand Up @@ -136,6 +152,21 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
for _, resourceSpan := range resourceSpans {
for _, scopedSpan := range resourceSpan.ScopeSpans {
for _, span := range scopedSpan.Spans {
if scopedSpan.Scope != nil {
span.Attributes = append(span.Attributes, &v11.KeyValue{
Key: traces.MetadataServiceName,
Value: &v11.AnyValue{Value: &v11.AnyValue_StringValue{StringValue: scopedSpan.Scope.Name}},
})

// Add attributes from the resource
span.Attributes = append(span.Attributes, scopedSpan.Scope.Attributes...)
}

// Add attributes from the resource
if resourceSpan.Resource != nil {
span.Attributes = append(span.Attributes, resourceSpan.Resource.Attributes...)
}

traceID := trace.TraceID(span.TraceId).String()
spans[traceID] = append(spans[traceID], span)
}
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
Mode string `mapstructure:"mode"`
Insecure bool `mapstructure:"insecure"`
SkipVerify bool `mapstructure:"skip_verify"`
TraceMode bool `mapstructure:"trace_mode"`

OTLPServer OtlpServer `mapstructure:"otlp_server"`
}
Expand Down
1 change: 1 addition & 0 deletions agent/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Flags struct {
CollectorEndpoint string
Insecure bool
SkipVerify bool
TraceMode bool
}

func (f Flags) AutomatedEnvironmentCanBeInferred() bool {
Expand Down
10 changes: 10 additions & 0 deletions agent/opentelemetry/proto/collector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# OpenTelemetry Collector Proto

This package describes the OpenTelemetry collector protocol.

## Packages

1. `common` package contains the common messages shared between different services.
2. `trace` package contains the Trace Service protos.
3. `metrics` package contains the Metrics Service protos.
4. `logs` package contains the Logs Service protos.
79 changes: 79 additions & 0 deletions agent/opentelemetry/proto/collector/logs/v1/logs_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package opentelemetry.proto.collector.logs.v1;

import "opentelemetry/proto/logs/v1/logs.proto";

option csharp_namespace = "OpenTelemetry.Proto.Collector.Logs.V1";
option java_multiple_files = true;
option java_package = "io.opentelemetry.proto.collector.logs.v1";
option java_outer_classname = "LogsServiceProto";
option go_package = "go.opentelemetry.io/proto/otlp/collector/logs/v1";

// Service that can be used to push logs between one Application instrumented with
// OpenTelemetry and an collector, or between an collector and a central collector (in this
// case logs are sent/received to/from multiple Applications).
service LogsService {
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
rpc Export(ExportLogsServiceRequest) returns (ExportLogsServiceResponse) {}
}

message ExportLogsServiceRequest {
// An array of ResourceLogs.
// For data coming from a single resource this array will typically contain one
// element. Intermediary nodes (such as OpenTelemetry Collector) that receive
// data from multiple origins typically batch the data before forwarding further and
// in that case this array will contain multiple elements.
repeated opentelemetry.proto.logs.v1.ResourceLogs resource_logs = 1;
}

message ExportLogsServiceResponse {
// The details of a partially successful export request.
//
// If the request is only partially accepted
// (i.e. when the server accepts only parts of the data and rejects the rest)
// the server MUST initialize the `partial_success` field and MUST
// set the `rejected_<signal>` with the number of items it rejected.
//
// Servers MAY also make use of the `partial_success` field to convey
// warnings/suggestions to senders even when the request was fully accepted.
// In such cases, the `rejected_<signal>` MUST have a value of `0` and
// the `error_message` MUST be non-empty.
//
// A `partial_success` message with an empty value (rejected_<signal> = 0 and
// `error_message` = "") is equivalent to it not being set/present. Senders
// SHOULD interpret it the same way as in the full success case.
ExportLogsPartialSuccess partial_success = 1;
}

message ExportLogsPartialSuccess {
// The number of rejected log records.
//
// A `rejected_<signal>` field holding a `0` value indicates that the
// request was fully accepted.
int64 rejected_log_records = 1;

// A developer-facing human-readable message in English. It should be used
// either to explain why the server rejected parts of the data during a partial
// success or to convey warnings/suggestions during a full success. The message
// should offer guidance on how users can address such issues.
//
// error_message is an optional field. An error_message with an empty value
// is equivalent to it not being set.
string error_message = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# This is an API configuration to generate an HTTP/JSON -> gRPC gateway for the
# OpenTelemetry service using github.com/grpc-ecosystem/grpc-gateway.
type: google.api.Service
config_version: 3
http:
rules:
- selector: opentelemetry.proto.collector.logs.v1.LogsService.Export
post: /v1/logs
body: "*"
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package opentelemetry.proto.collector.metrics.v1;

import "opentelemetry/proto/metrics/v1/metrics.proto";

option csharp_namespace = "OpenTelemetry.Proto.Collector.Metrics.V1";
option java_multiple_files = true;
option java_package = "io.opentelemetry.proto.collector.metrics.v1";
option java_outer_classname = "MetricsServiceProto";
option go_package = "go.opentelemetry.io/proto/otlp/collector/metrics/v1";

// Service that can be used to push metrics between one Application
// instrumented with OpenTelemetry and a collector, or between a collector and a
// central collector.
service MetricsService {
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
rpc Export(ExportMetricsServiceRequest) returns (ExportMetricsServiceResponse) {}
}

message ExportMetricsServiceRequest {
// An array of ResourceMetrics.
// For data coming from a single resource this array will typically contain one
// element. Intermediary nodes (such as OpenTelemetry Collector) that receive
// data from multiple origins typically batch the data before forwarding further and
// in that case this array will contain multiple elements.
repeated opentelemetry.proto.metrics.v1.ResourceMetrics resource_metrics = 1;
}

message ExportMetricsServiceResponse {
// The details of a partially successful export request.
//
// If the request is only partially accepted
// (i.e. when the server accepts only parts of the data and rejects the rest)
// the server MUST initialize the `partial_success` field and MUST
// set the `rejected_<signal>` with the number of items it rejected.
//
// Servers MAY also make use of the `partial_success` field to convey
// warnings/suggestions to senders even when the request was fully accepted.
// In such cases, the `rejected_<signal>` MUST have a value of `0` and
// the `error_message` MUST be non-empty.
//
// A `partial_success` message with an empty value (rejected_<signal> = 0 and
// `error_message` = "") is equivalent to it not being set/present. Senders
// SHOULD interpret it the same way as in the full success case.
ExportMetricsPartialSuccess partial_success = 1;
}

message ExportMetricsPartialSuccess {
// The number of rejected data points.
//
// A `rejected_<signal>` field holding a `0` value indicates that the
// request was fully accepted.
int64 rejected_data_points = 1;

// A developer-facing human-readable message in English. It should be used
// either to explain why the server rejected parts of the data during a partial
// success or to convey warnings/suggestions during a full success. The message
// should offer guidance on how users can address such issues.
//
// error_message is an optional field. An error_message with an empty value
// is equivalent to it not being set.
string error_message = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# This is an API configuration to generate an HTTP/JSON -> gRPC gateway for the
# OpenTelemetry service using github.com/grpc-ecosystem/grpc-gateway.
type: google.api.Service
config_version: 3
http:
rules:
- selector: opentelemetry.proto.collector.metrics.v1.MetricsService.Export
post: /v1/metrics
body: "*"
Loading

0 comments on commit bf6f525

Please sign in to comment.