diff --git a/.chloggen/azureeventhub-trace-receiver.yaml b/.chloggen/azureeventhub-trace-receiver.yaml new file mode 100755 index 000000000000..2676a7e3514e --- /dev/null +++ b/.chloggen/azureeventhub-trace-receiver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: azureeventhubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added traces support in azureeventhubreceiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33583] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/pkg/translator/azure/resources_to_traces.go b/pkg/translator/azure/resources_to_traces.go new file mode 100644 index 000000000000..453770d7ea80 --- /dev/null +++ b/pkg/translator/azure/resources_to_traces.go @@ -0,0 +1,183 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package azure // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure" + +import ( + "bytes" + "encoding/hex" + "net/url" + + jsoniter "github.com/json-iterator/go" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.13.0" + "go.uber.org/zap" +) + +const ( + // Constants for OpenTelemetry Specs + traceAzureResourceID = "azure.resource.id" + traceScopeName = "otelcol/azureresourcetraces" +) + +type azureTracesRecords struct { + Records []azureTracesRecord `json:"records"` +} + +// Azure Trace Records based on Azure AppRequests & AppDependencies table data +// the common record schema reference: +// https://learn.microsoft.com/en-us/azure/azure-monitor/reference/tables/apprequests +// https://learn.microsoft.com/en-us/azure/azure-monitor/reference/tables/appdependencies +type azureTracesRecord struct { + Time string `json:"time"` + ResourceID string `json:"resourceId"` + ResourceGUID string `json:"ResourceGUID"` + Type string `json:"Type"` + AppRoleInstance string `json:"AppRoleInstance"` + AppRoleName string `json:"AppRoleName"` + AppVersion string `json:"AppVersion"` + ClientCity string `json:"ClientCity"` + ClientCountryOrRegion string `json:"ClientCountryOrRegion"` + ClientIP string `json:"ClientIP"` + ClientStateOrProvince string `json:"ClientStateOrProvince"` + ClientType string `json:"ClientType"` + IKey string `json:"IKey"` + OperationName string `json:"OperationName"` + OperationID string `json:"OperationId"` + ParentID string `json:"ParentId"` + SDKVersion string `json:"SDKVersion"` + Properties map[string]string `json:"Properties"` + Measurements map[string]float64 `json:"Measurements"` + SpanID string `json:"Id"` + Name string `json:"Name"` + URL string `json:"Url"` + Source string `json:"Source"` + Success bool `json:"Success"` + ResultCode string `json:"ResultCode"` + DurationMs float64 `json:"DurationMs"` + PerformanceBucket string `json:"PerformanceBucket"` + ItemCount float64 `json:"ItemCount"` +} + +var _ ptrace.Unmarshaler = (*TracesUnmarshaler)(nil) + +type TracesUnmarshaler struct { + Version string + Logger *zap.Logger +} + +func (r TracesUnmarshaler) UnmarshalTraces(buf []byte) (ptrace.Traces, error) { + t := ptrace.NewTraces() + + var azureTraces azureTracesRecords + decoder := jsoniter.NewDecoder(bytes.NewReader(buf)) + err := decoder.Decode(&azureTraces) + if err != nil { + return t, err + } + + resourceTraces := t.ResourceSpans().AppendEmpty() + resource := resourceTraces.Resource() + resource.Attributes().PutStr(conventions.AttributeTelemetrySDKName, traceScopeName) + resource.Attributes().PutStr(conventions.AttributeTelemetrySDKLanguage, conventions.AttributeTelemetrySDKLanguageGo) + resource.Attributes().PutStr(conventions.AttributeTelemetrySDKVersion, r.Version) + resource.Attributes().PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure) + + scopeSpans := resourceTraces.ScopeSpans().AppendEmpty() + + spans := scopeSpans.Spans() + + resourceID := "" + for _, azureTrace := range azureTraces.Records { + if resourceID == "" && azureTrace.ResourceID != "" { + resourceID = azureTrace.ResourceID + } + + resource.Attributes().PutStr("service.name", azureTrace.AppRoleName) + + nanos, err := asTimestamp(azureTrace.Time) + if err != nil { + r.Logger.Warn("Invalid Timestamp", zap.String("time", azureTrace.Time)) + continue + } + + var traceID, traceErr = TraceIDFromHex(azureTrace.OperationID) + if traceErr != nil { + r.Logger.Warn("Invalid TraceID", zap.String("traceID", azureTrace.OperationID)) + return t, err + } + var spanID, spanErr = SpanIDFromHex(azureTrace.SpanID) + if spanErr != nil { + r.Logger.Warn("Invalid SpanID", zap.String("spanID", azureTrace.SpanID)) + return t, err + } + var parentID, parentErr = SpanIDFromHex(azureTrace.ParentID) + if parentErr != nil { + r.Logger.Warn("Invalid ParentID", zap.String("parentID", azureTrace.ParentID)) + return t, err + } + + span := spans.AppendEmpty() + span.SetTraceID(traceID) + span.SetSpanID(spanID) + span.SetParentSpanID(parentID) + + span.Attributes().PutStr("OperationName", azureTrace.OperationName) + span.Attributes().PutStr("AppRoleName", azureTrace.AppRoleName) + span.Attributes().PutStr("AppRoleInstance", azureTrace.AppRoleInstance) + span.Attributes().PutStr("Type", azureTrace.Type) + + span.Attributes().PutStr("http.url", azureTrace.URL) + + urlObj, _ := url.Parse(azureTrace.URL) + hostname := urlObj.Host + hostpath := urlObj.Path + scheme := urlObj.Scheme + + span.Attributes().PutStr("http.host", hostname) + span.Attributes().PutStr("http.path", hostpath) + span.Attributes().PutStr("http.response.status_code", azureTrace.ResultCode) + span.Attributes().PutStr("http.client_ip", azureTrace.ClientIP) + span.Attributes().PutStr("http.client_city", azureTrace.ClientCity) + span.Attributes().PutStr("http.client_type", azureTrace.ClientType) + span.Attributes().PutStr("http.client_state", azureTrace.ClientStateOrProvince) + span.Attributes().PutStr("http.client_type", azureTrace.ClientType) + span.Attributes().PutStr("http.client_country", azureTrace.ClientCountryOrRegion) + span.Attributes().PutStr("http.scheme", scheme) + span.Attributes().PutStr("http.method", azureTrace.Properties["HTTP Method"]) + + span.SetKind(ptrace.SpanKindServer) + span.SetName(azureTrace.Name) + span.SetStartTimestamp(nanos) + span.SetEndTimestamp(nanos + pcommon.Timestamp(azureTrace.DurationMs*1e6)) + } + + if resourceID != "" { + resourceTraces.Resource().Attributes().PutStr(traceAzureResourceID, resourceID) + } else { + r.Logger.Warn("No ResourceID Set on Traces!") + } + + return t, nil +} + +func TraceIDFromHex(hexStr string) (pcommon.TraceID, error) { + bytes, err := hex.DecodeString(hexStr) + if err != nil { + return pcommon.TraceID{}, err + } + var id pcommon.TraceID + copy(id[:], bytes) + return id, nil +} + +func SpanIDFromHex(hexStr string) (pcommon.SpanID, error) { + bytes, err := hex.DecodeString(hexStr) + if err != nil { + return pcommon.SpanID{}, err + } + var id pcommon.SpanID + copy(id[:], bytes) + return id, nil +} diff --git a/receiver/azureeventhubreceiver/README.md b/receiver/azureeventhubreceiver/README.md index dc899b3112c6..9cf9a13b4734 100644 --- a/receiver/azureeventhubreceiver/README.md +++ b/receiver/azureeventhubreceiver/README.md @@ -3,7 +3,7 @@ | Status | | | ------------- |-----------| -| Stability | [alpha]: metrics, logs | +| Stability | [alpha]: metrics, logs, traces | | Distributions | [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fazureeventhub%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fazureeventhub) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fazureeventhub%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fazureeventhub) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atoulme](https://www.github.com/atoulme), [@cparkins](https://www.github.com/cparkins) \| Seeking more code owners! | @@ -120,4 +120,16 @@ with a Data Points that represents the values for the Metric including: Total, Minimum, Maximum, Average and Count. +Traces based on Azure Application Insights array of records from `AppRequests` & `AppDependencies` with the following fields. + +| Azure | Open Telemetry | +|-------------|-------------------------------------------------------| +| Time | start_time(time_unix_nano(time)) | +| | end_time(start_time + time_unix_nano(durationMs)) | +| Name | span.name | +| OperationId | trace.id | +| ParentId | span.parentId | +| Id | span.id | +| AppRoleName | service.name | + [storage extension]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage diff --git a/receiver/azureeventhubreceiver/azureresourcetraces_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcetraces_unmarshaler.go new file mode 100755 index 000000000000..40fed32a62ff --- /dev/null +++ b/receiver/azureeventhubreceiver/azureresourcetraces_unmarshaler.go @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver" + +import ( + eventhub "github.com/Azure/azure-event-hubs-go/v3" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure" +) + +type azureTracesEventUnmarshaler struct { + unmarshaler *azure.TracesUnmarshaler +} + +func newAzureTracesUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) eventTracesUnmarshaler { + return azureTracesEventUnmarshaler{ + unmarshaler: &azure.TracesUnmarshaler{ + Version: buildInfo.Version, + Logger: logger, + }, + } +} + +// UnmarshalTraces takes a byte array containing a JSON-encoded +// payload with Azure records and transforms it into +// an OpenTelemetry ptraces.traces object. The data in the Azure +// record appears as fields and attributes in the +// OpenTelemetry representation; the bodies of the +// OpenTelemetry trace records are empty. +func (r azureTracesEventUnmarshaler) UnmarshalTraces(event *eventhub.Event) (ptrace.Traces, error) { + return r.unmarshaler.UnmarshalTraces(event.Data) +} diff --git a/receiver/azureeventhubreceiver/eventhubhandler_test.go b/receiver/azureeventhubreceiver/eventhubhandler_test.go index 772b6faed724..5058ffe168e4 100644 --- a/receiver/azureeventhubreceiver/eventhubhandler_test.go +++ b/receiver/azureeventhubreceiver/eventhubhandler_test.go @@ -57,15 +57,20 @@ func (m mockListenerHandleWrapper) Err() error { } type mockDataConsumer struct { - logsUnmarshaler eventLogsUnmarshaler - nextLogsConsumer consumer.Logs - obsrecv *receiverhelper.ObsReport + logsUnmarshaler eventLogsUnmarshaler + nextLogsConsumer consumer.Logs + nextTracesConsumer consumer.Traces + obsrecv *receiverhelper.ObsReport } func (m *mockDataConsumer) setNextLogsConsumer(nextLogsConsumer consumer.Logs) { m.nextLogsConsumer = nextLogsConsumer } +func (m *mockDataConsumer) setNextTracesConsumer(nextTracesConsumer consumer.Traces) { + m.nextTracesConsumer = nextTracesConsumer +} + func (m *mockDataConsumer) setNextMetricsConsumer(_ consumer.Metrics) {} func (m *mockDataConsumer) consume(ctx context.Context, event *eventhub.Event) error { diff --git a/receiver/azureeventhubreceiver/factory.go b/receiver/azureeventhubreceiver/factory.go index 6ad588b2ebc2..ae857545cede 100644 --- a/receiver/azureeventhubreceiver/factory.go +++ b/receiver/azureeventhubreceiver/factory.go @@ -38,7 +38,9 @@ func NewFactory() receiver.Factory { metadata.Type, createDefaultConfig, receiver.WithLogs(f.createLogsReceiver, metadata.LogsStability), - receiver.WithMetrics(f.createMetricsReceiver, metadata.MetricsStability)) + receiver.WithMetrics(f.createMetricsReceiver, metadata.MetricsStability), + receiver.WithTraces(f.createTracesReceiver, metadata.TracesStability), + ) } func createDefaultConfig() component.Config { @@ -79,6 +81,23 @@ func (f *eventhubReceiverFactory) createMetricsReceiver( return receiver, nil } +func (f *eventhubReceiverFactory) createTracesReceiver( + _ context.Context, + settings receiver.Settings, + cfg component.Config, + nextConsumer consumer.Traces, +) (receiver.Traces, error) { + + receiver, err := f.getReceiver(component.DataTypeTraces, cfg, settings) + if err != nil { + return nil, err + } + + receiver.(dataConsumer).setNextTracesConsumer(nextConsumer) + + return receiver, nil +} + func (f *eventhubReceiverFactory) getReceiver( receiverType component.Type, cfg component.Config, @@ -95,6 +114,7 @@ func (f *eventhubReceiverFactory) getReceiver( var logsUnmarshaler eventLogsUnmarshaler var metricsUnmarshaler eventMetricsUnmarshaler + var tracesUnmarshaler eventTracesUnmarshaler switch receiverType { case component.DataTypeLogs: if logFormat(receiverConfig.Format) == rawLogFormat { @@ -110,7 +130,12 @@ func (f *eventhubReceiverFactory) getReceiver( metricsUnmarshaler = newAzureResourceMetricsUnmarshaler(settings.BuildInfo, settings.Logger) } case component.DataTypeTraces: - err = errors.New("unsupported traces data") + if logFormat(receiverConfig.Format) == rawLogFormat { + tracesUnmarshaler = nil + err = errors.New("raw format not supported for Traces") + } else { + tracesUnmarshaler = newAzureTracesUnmarshaler(settings.BuildInfo, settings.Logger) + } } if err != nil { @@ -120,7 +145,7 @@ func (f *eventhubReceiverFactory) getReceiver( eventHandler := newEventhubHandler(receiverConfig, settings) var rcvr component.Component - rcvr, err = newReceiver(receiverType, logsUnmarshaler, metricsUnmarshaler, eventHandler, settings) + rcvr, err = newReceiver(receiverType, logsUnmarshaler, metricsUnmarshaler, tracesUnmarshaler, eventHandler, settings) return rcvr }) diff --git a/receiver/azureeventhubreceiver/generated_component_test.go b/receiver/azureeventhubreceiver/generated_component_test.go index 6b8729b08944..5c63ccdbb3fa 100644 --- a/receiver/azureeventhubreceiver/generated_component_test.go +++ b/receiver/azureeventhubreceiver/generated_component_test.go @@ -44,6 +44,13 @@ func TestComponentLifecycle(t *testing.T) { return factory.CreateMetricsReceiver(ctx, set, cfg, consumertest.NewNop()) }, }, + + { + name: "traces", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateTracesReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, } cm, err := confmaptest.LoadConf("metadata.yaml") diff --git a/receiver/azureeventhubreceiver/internal/metadata/generated_status.go b/receiver/azureeventhubreceiver/internal/metadata/generated_status.go index 81e8d8cc1701..eb33d2cfc1e8 100644 --- a/receiver/azureeventhubreceiver/internal/metadata/generated_status.go +++ b/receiver/azureeventhubreceiver/internal/metadata/generated_status.go @@ -13,4 +13,5 @@ var ( const ( MetricsStability = component.StabilityLevelAlpha LogsStability = component.StabilityLevelAlpha + TracesStability = component.StabilityLevelAlpha ) diff --git a/receiver/azureeventhubreceiver/metadata.yaml b/receiver/azureeventhubreceiver/metadata.yaml index 155dcd1bbea5..034b0c4f92e7 100644 --- a/receiver/azureeventhubreceiver/metadata.yaml +++ b/receiver/azureeventhubreceiver/metadata.yaml @@ -3,7 +3,7 @@ type: azureeventhub status: class: receiver stability: - alpha: [metrics, logs] + alpha: [metrics, logs, traces] distributions: [contrib] codeowners: active: [atoulme, cparkins] diff --git a/receiver/azureeventhubreceiver/receiver.go b/receiver/azureeventhubreceiver/receiver.go index 3bfd55500bde..88490eef5a94 100644 --- a/receiver/azureeventhubreceiver/receiver.go +++ b/receiver/azureeventhubreceiver/receiver.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" @@ -24,6 +25,7 @@ type dataConsumer interface { consume(ctx context.Context, event *eventhub.Event) error setNextLogsConsumer(nextLogsConsumer consumer.Logs) setNextMetricsConsumer(nextLogsConsumer consumer.Metrics) + setNextTracesConsumer(nextTracesConsumer consumer.Traces) } type eventLogsUnmarshaler interface { @@ -34,14 +36,20 @@ type eventMetricsUnmarshaler interface { UnmarshalMetrics(event *eventhub.Event) (pmetric.Metrics, error) } +type eventTracesUnmarshaler interface { + UnmarshalTraces(event *eventhub.Event) (ptrace.Traces, error) +} + type eventhubReceiver struct { eventHandler *eventhubHandler dataType component.Type logger *zap.Logger logsUnmarshaler eventLogsUnmarshaler metricsUnmarshaler eventMetricsUnmarshaler + tracesUnmarshaler eventTracesUnmarshaler nextLogsConsumer consumer.Logs nextMetricsConsumer consumer.Metrics + nextTracesConsumer consumer.Traces obsrecv *receiverhelper.ObsReport } @@ -61,6 +69,10 @@ func (receiver *eventhubReceiver) setNextMetricsConsumer(nextMetricsConsumer con receiver.nextMetricsConsumer = nextMetricsConsumer } +func (receiver *eventhubReceiver) setNextTracesConsumer(nextTracesConsumer consumer.Traces) { + receiver.nextTracesConsumer = nextTracesConsumer +} + func (receiver *eventhubReceiver) consume(ctx context.Context, event *eventhub.Event) error { switch receiver.dataType { case component.DataTypeLogs: @@ -68,7 +80,7 @@ func (receiver *eventhubReceiver) consume(ctx context.Context, event *eventhub.E case component.DataTypeMetrics: return receiver.consumeMetrics(ctx, event) case component.DataTypeTraces: - fallthrough + return receiver.consumeTraces(ctx, event) default: return fmt.Errorf("invalid data type: %v", receiver.dataType) } @@ -123,10 +135,36 @@ func (receiver *eventhubReceiver) consumeMetrics(ctx context.Context, event *eve return err } +func (receiver *eventhubReceiver) consumeTraces(ctx context.Context, event *eventhub.Event) error { + + if receiver.nextTracesConsumer == nil { + return nil + } + + if receiver.tracesUnmarshaler == nil { + return errors.New("unable to unmarshal traces with configured format") + } + + tracesContext := receiver.obsrecv.StartTracesOp(ctx) + + traces, err := receiver.tracesUnmarshaler.UnmarshalTraces(event) + if err != nil { + return fmt.Errorf("failed to unmarshal traces: %w", err) + } + + receiver.logger.Debug("traces Records", zap.Any("traces", traces)) + err = receiver.nextTracesConsumer.ConsumeTraces(tracesContext, traces) + + receiver.obsrecv.EndTracesOp(tracesContext, metadata.Type.String(), 1, err) + + return err +} + func newReceiver( receiverType component.Type, logsUnmarshaler eventLogsUnmarshaler, metricsUnmarshaler eventMetricsUnmarshaler, + tracesUnmarshaler eventTracesUnmarshaler, eventHandler *eventhubHandler, settings receiver.Settings, ) (component.Component, error) { @@ -146,6 +184,7 @@ func newReceiver( logger: settings.Logger, logsUnmarshaler: logsUnmarshaler, metricsUnmarshaler: metricsUnmarshaler, + tracesUnmarshaler: tracesUnmarshaler, obsrecv: obsrecv, }