Skip to content

Commit

Permalink
Change pdata to use the newly added [Traces|Metrics|Logs]Data.
Browse files Browse the repository at this point in the history
The OTLP marshaler/unmarshalers will use the newly added Data messages.

Updates: #4207

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Oct 19, 2021
1 parent f85f13e commit 4ca44d2
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 49 deletions.
34 changes: 16 additions & 18 deletions model/internal/otlp_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
package internal // import "go.opentelemetry.io/collector/model/internal"

import (
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1"
otlpcommon "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
)
Expand All @@ -27,26 +25,26 @@ import (
// as a way to prevent certain functions of pdata.Metrics data type to be callable by
// any code outside of this module.
type MetricsWrapper struct {
req *otlpcollectormetrics.ExportMetricsServiceRequest
req *otlpmetrics.MetricsData
}

// MetricsToOtlp internal helper to convert MetricsWrapper to protobuf representation.
func MetricsToOtlp(mw MetricsWrapper) *otlpcollectormetrics.ExportMetricsServiceRequest {
func MetricsToOtlp(mw MetricsWrapper) *otlpmetrics.MetricsData {
return mw.req
}

// MetricsFromOtlp internal helper to convert protobuf representation to MetricsWrapper.
func MetricsFromOtlp(req *otlpcollectormetrics.ExportMetricsServiceRequest) MetricsWrapper {
MetricsCompatibilityChanges(req)
func MetricsFromOtlp(req *otlpmetrics.MetricsData) MetricsWrapper {
metricsCompatibilityChanges(req)
return MetricsWrapper{req: req}
}

// MetricsCompatibilityChanges performs backward compatibility conversion on Metrics:
// metricsCompatibilityChanges performs backward compatibility conversion on Metrics:
// - Convert IntHistogram to Histogram. See https://github.com/open-telemetry/opentelemetry-proto/blob/f3b0ee0861d304f8f3126686ba9b01c106069cb0/opentelemetry/proto/metrics/v1/metrics.proto#L170
// - Convert IntGauge to Gauge. See https://github.com/open-telemetry/opentelemetry-proto/blob/f3b0ee0861d304f8f3126686ba9b01c106069cb0/opentelemetry/proto/metrics/v1/metrics.proto#L156
// - Convert IntSum to Sum. See https://github.com/open-telemetry/opentelemetry-proto/blob/f3b0ee0861d304f8f3126686ba9b01c106069cb0/opentelemetry/proto/metrics/v1/metrics.proto#L156
// - Converts Labels to Attributes. See https://github.com/open-telemetry/opentelemetry-proto/blob/8672494217bfc858e2a82a4e8c623d4a5530473a/opentelemetry/proto/metrics/v1/metrics.proto#L385
func MetricsCompatibilityChanges(req *otlpcollectormetrics.ExportMetricsServiceRequest) {
func metricsCompatibilityChanges(req *otlpmetrics.MetricsData) {
for _, rsm := range req.ResourceMetrics {
for _, ilm := range rsm.InstrumentationLibraryMetrics {
for _, metric := range ilm.Metrics {
Expand Down Expand Up @@ -76,25 +74,25 @@ func MetricsCompatibilityChanges(req *otlpcollectormetrics.ExportMetricsServiceR
// as a way to prevent certain functions of pdata.Traces data type to be callable by
// any code outside of this module.
type TracesWrapper struct {
req *otlpcollectortrace.ExportTraceServiceRequest
req *otlptrace.TracesData
}

// TracesToOtlp internal helper to convert TracesWrapper to protobuf representation.
func TracesToOtlp(mw TracesWrapper) *otlpcollectortrace.ExportTraceServiceRequest {
func TracesToOtlp(mw TracesWrapper) *otlptrace.TracesData {
return mw.req
}

// TracesFromOtlp internal helper to convert protobuf representation to TracesWrapper.
func TracesFromOtlp(req *otlpcollectortrace.ExportTraceServiceRequest) TracesWrapper {
TracesCompatibilityChanges(req)
func TracesFromOtlp(req *otlptrace.TracesData) TracesWrapper {
tracesCompatibilityChanges(req)
return TracesWrapper{req: req}
}

// TracesCompatibilityChanges performs backward compatibility conversion of Span Status code according to
// tracesCompatibilityChanges performs backward compatibility conversion of Span Status code according to
// OTLP specification as we are a new receiver and sender (we are pushing data to the pipelines):
// See https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L239
// See https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L253
func TracesCompatibilityChanges(req *otlpcollectortrace.ExportTraceServiceRequest) {
func tracesCompatibilityChanges(req *otlptrace.TracesData) {
for _, rss := range req.ResourceSpans {
for _, ils := range rss.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
Expand All @@ -118,16 +116,16 @@ func TracesCompatibilityChanges(req *otlpcollectortrace.ExportTraceServiceReques
// as a way to prevent certain functions of pdata.Logs data type to be callable by
// any code outside of this module.
type LogsWrapper struct {
req *otlpcollectorlog.ExportLogsServiceRequest
req *otlplogs.LogsData
}

// LogsToOtlp internal helper to convert LogsWrapper to protobuf representation.
func LogsToOtlp(l LogsWrapper) *otlpcollectorlog.ExportLogsServiceRequest {
func LogsToOtlp(l LogsWrapper) *otlplogs.LogsData {
return l.req
}

// LogsFromOtlp internal helper to convert protobuf representation to LogsWrapper.
func LogsFromOtlp(req *otlpcollectorlog.ExportLogsServiceRequest) LogsWrapper {
func LogsFromOtlp(req *otlplogs.LogsData) LogsWrapper {
return LogsWrapper{req: req}
}

Expand Down
10 changes: 5 additions & 5 deletions model/internal/otlp_wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestDeprecatedStatusCode(t *testing.T) {
},
}

TracesCompatibilityChanges(req)
tracesCompatibilityChanges(req)
spanProto := req.ResourceSpans[0].InstrumentationLibrarySpans[0].Spans[0]
// Check that DeprecatedCode is passed as is.
assert.EqualValues(t, test.expectedRcvCode, spanProto.Status.Code)
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestDeprecatedIntHistogram(t *testing.T) {
}},
},
}
MetricsCompatibilityChanges(req)
metricsCompatibilityChanges(req)
assert.EqualValues(t, test.outputMetrics, req.ResourceMetrics[0].InstrumentationLibraryMetrics[0].Metrics)
})
}
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestDeprecatedIntGauge(t *testing.T) {
}},
},
}
MetricsCompatibilityChanges(req)
metricsCompatibilityChanges(req)
assert.EqualValues(t, test.outputMetrics, req.ResourceMetrics[0].InstrumentationLibraryMetrics[0].Metrics)
})
}
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestDeprecatedIntSum(t *testing.T) {
}},
},
}
MetricsCompatibilityChanges(req)
metricsCompatibilityChanges(req)
assert.EqualValues(t, test.outputMetrics, req.ResourceMetrics[0].InstrumentationLibraryMetrics[0].Metrics)
})
}
Expand Down Expand Up @@ -747,7 +747,7 @@ func TestAttributesAndLabels(t *testing.T) {
}},
},
}
MetricsCompatibilityChanges(req)
metricsCompatibilityChanges(req)
assert.EqualValues(t, test.outputMetrics, req.ResourceMetrics[0].InstrumentationLibraryMetrics[0].Metrics)
})
}
Expand Down
14 changes: 6 additions & 8 deletions model/otlp/json_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"github.com/gogo/protobuf/jsonpb"

"go.opentelemetry.io/collector/model/internal"
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -50,25 +50,23 @@ func newJSONUnmarshaler() *jsonUnmarshaler {
}

func (d *jsonUnmarshaler) UnmarshalLogs(buf []byte) (pdata.Logs, error) {
ld := &otlpcollectorlog.ExportLogsServiceRequest{}

ld := &otlplogs.LogsData{}
if err := d.delegate.Unmarshal(bytes.NewReader(buf), ld); err != nil {
return pdata.Logs{}, err
}
return pdata.LogsFromInternalRep(internal.LogsFromOtlp(ld)), nil
}

func (d *jsonUnmarshaler) UnmarshalMetrics(buf []byte) (pdata.Metrics, error) {
md := &otlpcollectormetrics.ExportMetricsServiceRequest{}

md := &otlpmetrics.MetricsData{}
if err := d.delegate.Unmarshal(bytes.NewReader(buf), md); err != nil {
return pdata.Metrics{}, err
}
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(md)), nil
}

func (d *jsonUnmarshaler) UnmarshalTraces(buf []byte) (pdata.Traces, error) {
td := &otlpcollectortrace.ExportTraceServiceRequest{}
td := &otlptrace.TracesData{}
if err := d.delegate.Unmarshal(bytes.NewReader(buf), td); err != nil {
return pdata.Traces{}, err
}
Expand Down
12 changes: 6 additions & 6 deletions model/otlp/pb_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package otlp // import "go.opentelemetry.io/collector/model/otlp"

import (
"go.opentelemetry.io/collector/model/internal"
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
"go.opentelemetry.io/collector/model/pdata"
)

Expand All @@ -44,19 +44,19 @@ func newPbUnmarshaler() *pbUnmarshaler {
}

func (d *pbUnmarshaler) UnmarshalLogs(buf []byte) (pdata.Logs, error) {
ld := &otlpcollectorlog.ExportLogsServiceRequest{}
ld := &otlplogs.LogsData{}
err := ld.Unmarshal(buf)
return pdata.LogsFromInternalRep(internal.LogsFromOtlp(ld)), err
}

func (d *pbUnmarshaler) UnmarshalMetrics(buf []byte) (pdata.Metrics, error) {
md := &otlpcollectormetrics.ExportMetricsServiceRequest{}
md := &otlpmetrics.MetricsData{}
err := md.Unmarshal(buf)
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(md)), err
}

func (d *pbUnmarshaler) UnmarshalTraces(buf []byte) (pdata.Traces, error) {
td := &otlpcollectortrace.ExportTraceServiceRequest{}
td := &otlptrace.TracesData{}
err := td.Unmarshal(buf)
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(td)), err
}
3 changes: 2 additions & 1 deletion model/otlpgrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/collector/model/internal"
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -81,7 +82,7 @@ func (lr LogsRequest) SetLogs(ld pdata.Logs) {
}

func (lr LogsRequest) Logs() pdata.Logs {
return pdata.LogsFromInternalRep(internal.LogsFromOtlp(lr.orig))
return pdata.LogsFromInternalRep(internal.LogsFromOtlp(&otlplogs.LogsData{ResourceLogs: lr.orig.ResourceLogs}))
}

// LogsClient is the client API for OTLP-GRPC Logs service.
Expand Down
3 changes: 2 additions & 1 deletion model/otlpgrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.opentelemetry.io/collector/model/internal"
otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -80,7 +81,7 @@ func (mr MetricsRequest) SetMetrics(ld pdata.Metrics) {
}

func (mr MetricsRequest) Metrics() pdata.Metrics {
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(mr.orig))
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(&otlpmetrics.MetricsData{ResourceMetrics: mr.orig.ResourceMetrics}))
}

// MetricsClient is the client API for OTLP-GRPC Metrics service.
Expand Down
3 changes: 2 additions & 1 deletion model/otlpgrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.opentelemetry.io/collector/model/internal"
otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -80,7 +81,7 @@ func (tr TracesRequest) SetTraces(td pdata.Traces) {
}

func (tr TracesRequest) Traces() pdata.Traces {
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(tr.orig))
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(&otlptrace.TracesData{ResourceSpans: tr.orig.ResourceSpans}))
}

// TracesClient is the client API for OTLP-GRPC Traces service.
Expand Down
5 changes: 2 additions & 3 deletions model/pdata/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package pdata // import "go.opentelemetry.io/collector/model/pdata"

import (
"go.opentelemetry.io/collector/model/internal"
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
)

Expand Down Expand Up @@ -48,12 +47,12 @@ type LogsSizer interface {
// Must use NewLogs functions to create new instances.
// Important: zero-initialized instance is not valid for use.
type Logs struct {
orig *otlpcollectorlog.ExportLogsServiceRequest
orig *otlplogs.LogsData
}

// NewLogs creates a new Logs.
func NewLogs() Logs {
return Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{}}
return Logs{orig: &otlplogs.LogsData{}}
}

// LogsFromInternalRep creates the internal Logs representation from the ProtoBuf. Should
Expand Down
5 changes: 2 additions & 3 deletions model/pdata/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package pdata // import "go.opentelemetry.io/collector/model/pdata"

import (
"go.opentelemetry.io/collector/model/internal"
otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
)

Expand Down Expand Up @@ -47,12 +46,12 @@ type MetricsSizer interface {
// Outside of the core repository, the metrics pipeline cannot be converted to the new model since data.MetricData is
// part of the internal package.
type Metrics struct {
orig *otlpcollectormetrics.ExportMetricsServiceRequest
orig *otlpmetrics.MetricsData
}

// NewMetrics creates a new Metrics.
func NewMetrics() Metrics {
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{}}
return Metrics{orig: &otlpmetrics.MetricsData{}}
}

// MetricsFromInternalRep creates Metrics from the internal representation.
Expand Down
5 changes: 2 additions & 3 deletions model/pdata/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package pdata // import "go.opentelemetry.io/collector/model/pdata"

import (
"go.opentelemetry.io/collector/model/internal"
otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
)

Expand All @@ -43,12 +42,12 @@ type TracesSizer interface {

// Traces is the top-level struct that is propagated through the traces pipeline.
type Traces struct {
orig *otlpcollectortrace.ExportTraceServiceRequest
orig *otlptrace.TracesData
}

// NewTraces creates a new Traces.
func NewTraces() Traces {
return Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
return Traces{orig: &otlptrace.TracesData{}}
}

// TracesFromInternalRep creates Traces from the internal representation.
Expand Down

0 comments on commit 4ca44d2

Please sign in to comment.