Skip to content

Commit

Permalink
Change pdata to use the newly added [Traces|Metrics|Logs]Data. (#4214)
Browse files Browse the repository at this point in the history
The OTLP marshaler/unmarshalers will use the newly added Data messages.

Updates: #4207

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Oct 19, 2021
1 parent f85f13e commit 9f2ae4a
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 85 deletions.
34 changes: 16 additions & 18 deletions model/internal/otlp_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
package internal // import "go.opentelemetry.io/collector/model/internal"

import (
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1"
otlpcommon "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
)
Expand All @@ -27,26 +25,26 @@ import (
// as a way to prevent certain functions of pdata.Metrics data type to be callable by
// any code outside of this module.
type MetricsWrapper struct {
req *otlpcollectormetrics.ExportMetricsServiceRequest
req *otlpmetrics.MetricsData
}

// MetricsToOtlp internal helper to convert MetricsWrapper to protobuf representation.
func MetricsToOtlp(mw MetricsWrapper) *otlpcollectormetrics.ExportMetricsServiceRequest {
func MetricsToOtlp(mw MetricsWrapper) *otlpmetrics.MetricsData {
return mw.req
}

// MetricsFromOtlp internal helper to convert protobuf representation to MetricsWrapper.
func MetricsFromOtlp(req *otlpcollectormetrics.ExportMetricsServiceRequest) MetricsWrapper {
MetricsCompatibilityChanges(req)
func MetricsFromOtlp(req *otlpmetrics.MetricsData) MetricsWrapper {
metricsCompatibilityChanges(req)
return MetricsWrapper{req: req}
}

// MetricsCompatibilityChanges performs backward compatibility conversion on Metrics:
// metricsCompatibilityChanges performs backward compatibility conversion on Metrics:
// - Convert IntHistogram to Histogram. See https://github.com/open-telemetry/opentelemetry-proto/blob/f3b0ee0861d304f8f3126686ba9b01c106069cb0/opentelemetry/proto/metrics/v1/metrics.proto#L170
// - Convert IntGauge to Gauge. See https://github.com/open-telemetry/opentelemetry-proto/blob/f3b0ee0861d304f8f3126686ba9b01c106069cb0/opentelemetry/proto/metrics/v1/metrics.proto#L156
// - Convert IntSum to Sum. See https://github.com/open-telemetry/opentelemetry-proto/blob/f3b0ee0861d304f8f3126686ba9b01c106069cb0/opentelemetry/proto/metrics/v1/metrics.proto#L156
// - Converts Labels to Attributes. See https://github.com/open-telemetry/opentelemetry-proto/blob/8672494217bfc858e2a82a4e8c623d4a5530473a/opentelemetry/proto/metrics/v1/metrics.proto#L385
func MetricsCompatibilityChanges(req *otlpcollectormetrics.ExportMetricsServiceRequest) {
func metricsCompatibilityChanges(req *otlpmetrics.MetricsData) {
for _, rsm := range req.ResourceMetrics {
for _, ilm := range rsm.InstrumentationLibraryMetrics {
for _, metric := range ilm.Metrics {
Expand Down Expand Up @@ -76,25 +74,25 @@ func MetricsCompatibilityChanges(req *otlpcollectormetrics.ExportMetricsServiceR
// as a way to prevent certain functions of pdata.Traces data type to be callable by
// any code outside of this module.
type TracesWrapper struct {
req *otlpcollectortrace.ExportTraceServiceRequest
req *otlptrace.TracesData
}

// TracesToOtlp internal helper to convert TracesWrapper to protobuf representation.
func TracesToOtlp(mw TracesWrapper) *otlpcollectortrace.ExportTraceServiceRequest {
func TracesToOtlp(mw TracesWrapper) *otlptrace.TracesData {
return mw.req
}

// TracesFromOtlp internal helper to convert protobuf representation to TracesWrapper.
func TracesFromOtlp(req *otlpcollectortrace.ExportTraceServiceRequest) TracesWrapper {
TracesCompatibilityChanges(req)
func TracesFromOtlp(req *otlptrace.TracesData) TracesWrapper {
tracesCompatibilityChanges(req)
return TracesWrapper{req: req}
}

// TracesCompatibilityChanges performs backward compatibility conversion of Span Status code according to
// tracesCompatibilityChanges performs backward compatibility conversion of Span Status code according to
// OTLP specification as we are a new receiver and sender (we are pushing data to the pipelines):
// See https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L239
// See https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L253
func TracesCompatibilityChanges(req *otlpcollectortrace.ExportTraceServiceRequest) {
func tracesCompatibilityChanges(req *otlptrace.TracesData) {
for _, rss := range req.ResourceSpans {
for _, ils := range rss.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
Expand All @@ -118,16 +116,16 @@ func TracesCompatibilityChanges(req *otlpcollectortrace.ExportTraceServiceReques
// as a way to prevent certain functions of pdata.Logs data type to be callable by
// any code outside of this module.
type LogsWrapper struct {
req *otlpcollectorlog.ExportLogsServiceRequest
req *otlplogs.LogsData
}

// LogsToOtlp internal helper to convert LogsWrapper to protobuf representation.
func LogsToOtlp(l LogsWrapper) *otlpcollectorlog.ExportLogsServiceRequest {
func LogsToOtlp(l LogsWrapper) *otlplogs.LogsData {
return l.req
}

// LogsFromOtlp internal helper to convert protobuf representation to LogsWrapper.
func LogsFromOtlp(req *otlpcollectorlog.ExportLogsServiceRequest) LogsWrapper {
func LogsFromOtlp(req *otlplogs.LogsData) LogsWrapper {
return LogsWrapper{req: req}
}

Expand Down
22 changes: 10 additions & 12 deletions model/internal/otlp_wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (

"github.com/stretchr/testify/assert"

otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1"
otlpcommon "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
Expand Down Expand Up @@ -90,7 +88,7 @@ func TestDeprecatedStatusCode(t *testing.T) {

for _, test := range tests {
t.Run(test.sendCode.String()+"/"+test.sendDeprecatedCode.String(), func(t *testing.T) {
req := &otlpcollectortrace.ExportTraceServiceRequest{
req := &otlptrace.TracesData{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
Expand All @@ -109,7 +107,7 @@ func TestDeprecatedStatusCode(t *testing.T) {
},
}

TracesCompatibilityChanges(req)
tracesCompatibilityChanges(req)
spanProto := req.ResourceSpans[0].InstrumentationLibrarySpans[0].Spans[0]
// Check that DeprecatedCode is passed as is.
assert.EqualValues(t, test.expectedRcvCode, spanProto.Status.Code)
Expand Down Expand Up @@ -269,7 +267,7 @@ func TestDeprecatedIntHistogram(t *testing.T) {

for _, test := range tests {
t.Run(test.inputMetrics[0].Description, func(t *testing.T) {
req := &otlpcollectormetrics.ExportMetricsServiceRequest{
req := &otlpmetrics.MetricsData{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{
Expand All @@ -279,7 +277,7 @@ func TestDeprecatedIntHistogram(t *testing.T) {
}},
},
}
MetricsCompatibilityChanges(req)
metricsCompatibilityChanges(req)
assert.EqualValues(t, test.outputMetrics, req.ResourceMetrics[0].InstrumentationLibraryMetrics[0].Metrics)
})
}
Expand Down Expand Up @@ -372,7 +370,7 @@ func TestDeprecatedIntGauge(t *testing.T) {

for _, test := range tests {
t.Run(test.inputMetrics[0].Description, func(t *testing.T) {
req := &otlpcollectormetrics.ExportMetricsServiceRequest{
req := &otlpmetrics.MetricsData{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{
Expand All @@ -382,7 +380,7 @@ func TestDeprecatedIntGauge(t *testing.T) {
}},
},
}
MetricsCompatibilityChanges(req)
metricsCompatibilityChanges(req)
assert.EqualValues(t, test.outputMetrics, req.ResourceMetrics[0].InstrumentationLibraryMetrics[0].Metrics)
})
}
Expand Down Expand Up @@ -598,7 +596,7 @@ func TestDeprecatedIntSum(t *testing.T) {

for _, test := range tests {
t.Run(test.inputMetrics[0].Description, func(t *testing.T) {
req := &otlpcollectormetrics.ExportMetricsServiceRequest{
req := &otlpmetrics.MetricsData{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{
Expand All @@ -608,7 +606,7 @@ func TestDeprecatedIntSum(t *testing.T) {
}},
},
}
MetricsCompatibilityChanges(req)
metricsCompatibilityChanges(req)
assert.EqualValues(t, test.outputMetrics, req.ResourceMetrics[0].InstrumentationLibraryMetrics[0].Metrics)
})
}
Expand Down Expand Up @@ -737,7 +735,7 @@ func TestAttributesAndLabels(t *testing.T) {

for _, test := range tests {
t.Run(test.inputMetrics[0].Description, func(t *testing.T) {
req := &otlpcollectormetrics.ExportMetricsServiceRequest{
req := &otlpmetrics.MetricsData{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{
Expand All @@ -747,7 +745,7 @@ func TestAttributesAndLabels(t *testing.T) {
}},
},
}
MetricsCompatibilityChanges(req)
metricsCompatibilityChanges(req)
assert.EqualValues(t, test.outputMetrics, req.ResourceMetrics[0].InstrumentationLibraryMetrics[0].Metrics)
})
}
Expand Down
14 changes: 6 additions & 8 deletions model/otlp/json_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"github.com/gogo/protobuf/jsonpb"

"go.opentelemetry.io/collector/model/internal"
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -50,25 +50,23 @@ func newJSONUnmarshaler() *jsonUnmarshaler {
}

func (d *jsonUnmarshaler) UnmarshalLogs(buf []byte) (pdata.Logs, error) {
ld := &otlpcollectorlog.ExportLogsServiceRequest{}

ld := &otlplogs.LogsData{}
if err := d.delegate.Unmarshal(bytes.NewReader(buf), ld); err != nil {
return pdata.Logs{}, err
}
return pdata.LogsFromInternalRep(internal.LogsFromOtlp(ld)), nil
}

func (d *jsonUnmarshaler) UnmarshalMetrics(buf []byte) (pdata.Metrics, error) {
md := &otlpcollectormetrics.ExportMetricsServiceRequest{}

md := &otlpmetrics.MetricsData{}
if err := d.delegate.Unmarshal(bytes.NewReader(buf), md); err != nil {
return pdata.Metrics{}, err
}
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(md)), nil
}

func (d *jsonUnmarshaler) UnmarshalTraces(buf []byte) (pdata.Traces, error) {
td := &otlpcollectortrace.ExportTraceServiceRequest{}
td := &otlptrace.TracesData{}
if err := d.delegate.Unmarshal(bytes.NewReader(buf), td); err != nil {
return pdata.Traces{}, err
}
Expand Down
12 changes: 6 additions & 6 deletions model/otlp/pb_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package otlp // import "go.opentelemetry.io/collector/model/otlp"

import (
"go.opentelemetry.io/collector/model/internal"
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
"go.opentelemetry.io/collector/model/pdata"
)

Expand All @@ -44,19 +44,19 @@ func newPbUnmarshaler() *pbUnmarshaler {
}

func (d *pbUnmarshaler) UnmarshalLogs(buf []byte) (pdata.Logs, error) {
ld := &otlpcollectorlog.ExportLogsServiceRequest{}
ld := &otlplogs.LogsData{}
err := ld.Unmarshal(buf)
return pdata.LogsFromInternalRep(internal.LogsFromOtlp(ld)), err
}

func (d *pbUnmarshaler) UnmarshalMetrics(buf []byte) (pdata.Metrics, error) {
md := &otlpcollectormetrics.ExportMetricsServiceRequest{}
md := &otlpmetrics.MetricsData{}
err := md.Unmarshal(buf)
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(md)), err
}

func (d *pbUnmarshaler) UnmarshalTraces(buf []byte) (pdata.Traces, error) {
td := &otlpcollectortrace.ExportTraceServiceRequest{}
td := &otlptrace.TracesData{}
err := td.Unmarshal(buf)
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(td)), err
}
3 changes: 2 additions & 1 deletion model/otlpgrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/collector/model/internal"
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -81,7 +82,7 @@ func (lr LogsRequest) SetLogs(ld pdata.Logs) {
}

func (lr LogsRequest) Logs() pdata.Logs {
return pdata.LogsFromInternalRep(internal.LogsFromOtlp(lr.orig))
return pdata.LogsFromInternalRep(internal.LogsFromOtlp(&otlplogs.LogsData{ResourceLogs: lr.orig.ResourceLogs}))
}

// LogsClient is the client API for OTLP-GRPC Logs service.
Expand Down
3 changes: 2 additions & 1 deletion model/otlpgrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.opentelemetry.io/collector/model/internal"
otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1"
otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -80,7 +81,7 @@ func (mr MetricsRequest) SetMetrics(ld pdata.Metrics) {
}

func (mr MetricsRequest) Metrics() pdata.Metrics {
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(mr.orig))
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(&otlpmetrics.MetricsData{ResourceMetrics: mr.orig.ResourceMetrics}))
}

// MetricsClient is the client API for OTLP-GRPC Metrics service.
Expand Down
3 changes: 2 additions & 1 deletion model/otlpgrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.opentelemetry.io/collector/model/internal"
otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -80,7 +81,7 @@ func (tr TracesRequest) SetTraces(td pdata.Traces) {
}

func (tr TracesRequest) Traces() pdata.Traces {
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(tr.orig))
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(&otlptrace.TracesData{ResourceSpans: tr.orig.ResourceSpans}))
}

// TracesClient is the client API for OTLP-GRPC Traces service.
Expand Down
5 changes: 2 additions & 3 deletions model/pdata/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package pdata // import "go.opentelemetry.io/collector/model/pdata"

import (
"go.opentelemetry.io/collector/model/internal"
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
)

Expand Down Expand Up @@ -48,12 +47,12 @@ type LogsSizer interface {
// Must use NewLogs functions to create new instances.
// Important: zero-initialized instance is not valid for use.
type Logs struct {
orig *otlpcollectorlog.ExportLogsServiceRequest
orig *otlplogs.LogsData
}

// NewLogs creates a new Logs.
func NewLogs() Logs {
return Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{}}
return Logs{orig: &otlplogs.LogsData{}}
}

// LogsFromInternalRep creates the internal Logs representation from the ProtoBuf. Should
Expand Down
11 changes: 5 additions & 6 deletions model/pdata/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/model/internal"
otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1"
otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1"
)

Expand Down Expand Up @@ -50,17 +49,17 @@ func TestLogRecordCount(t *testing.T) {

func TestLogRecordCountWithEmpty(t *testing.T) {
assert.Zero(t, NewLogs().LogRecordCount())
assert.Zero(t, Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{
assert.Zero(t, Logs{orig: &otlplogs.LogsData{
ResourceLogs: []*otlplogs.ResourceLogs{{}},
}}.LogRecordCount())
assert.Zero(t, Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{
assert.Zero(t, Logs{orig: &otlplogs.LogsData{
ResourceLogs: []*otlplogs.ResourceLogs{
{
InstrumentationLibraryLogs: []*otlplogs.InstrumentationLibraryLogs{{}},
},
},
}}.LogRecordCount())
assert.Equal(t, 1, Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{
assert.Equal(t, 1, Logs{orig: &otlplogs.LogsData{
ResourceLogs: []*otlplogs.ResourceLogs{
{
InstrumentationLibraryLogs: []*otlplogs.InstrumentationLibraryLogs{
Expand All @@ -74,10 +73,10 @@ func TestLogRecordCountWithEmpty(t *testing.T) {
}

func TestToFromLogProto(t *testing.T) {
wrapper := internal.LogsFromOtlp(&otlpcollectorlog.ExportLogsServiceRequest{})
wrapper := internal.LogsFromOtlp(&otlplogs.LogsData{})
ld := LogsFromInternalRep(wrapper)
assert.EqualValues(t, NewLogs(), ld)
assert.EqualValues(t, &otlpcollectorlog.ExportLogsServiceRequest{}, ld.orig)
assert.EqualValues(t, &otlplogs.LogsData{}, ld.orig)
}

func TestLogsClone(t *testing.T) {
Expand Down
Loading

0 comments on commit 9f2ae4a

Please sign in to comment.