diff --git a/receiver/cloudfoundryreceiver/receiver.go b/receiver/cloudfoundryreceiver/receiver.go index cb94b8c431b9..8e73dd2ffc1b 100644 --- a/receiver/cloudfoundryreceiver/receiver.go +++ b/receiver/cloudfoundryreceiver/receiver.go @@ -38,14 +38,12 @@ const ( // newCloudFoundryReceiver implements the receiver.Metrics for Cloud Foundry protocol. type cloudFoundryReceiver struct { - settings component.TelemetrySettings - cancel context.CancelFunc - config Config - nextMetricsConsumer consumer.Metrics - nextLogsConsumer consumer.Logs - // nextTracesConsumer consumer.Traces + settings component.TelemetrySettings + cancel context.CancelFunc + config Config + nextMetrics consumer.Metrics + nextLogs consumer.Logs obsrecv *receiverhelper.ObsReport - telemetryType telemetryType goroutines sync.WaitGroup receiverStartTime time.Time } @@ -66,12 +64,11 @@ func newCloudFoundryMetricsReceiver( } result := &cloudFoundryReceiver{ - settings: settings.TelemetrySettings, - config: config, - nextMetricsConsumer: nextConsumer, - telemetryType: telemetryTypeMetrics, - obsrecv: obsrecv, - receiverStartTime: time.Now(), + settings: settings.TelemetrySettings, + config: config, + nextMetrics: nextConsumer, + obsrecv: obsrecv, + receiverStartTime: time.Now(), } return result, nil } @@ -93,8 +90,7 @@ func newCloudFoundryLogsReceiver( result := &cloudFoundryReceiver{ settings: settings.TelemetrySettings, config: config, - nextLogsConsumer: nextConsumer, - telemetryType: telemetryTypeLogs, + nextLogs: nextConsumer, obsrecv: obsrecv, receiverStartTime: time.Now(), } @@ -126,29 +122,26 @@ func (cfr *cloudFoundryReceiver) Start(ctx context.Context, host component.Host) go func() { defer cfr.goroutines.Done() cfr.settings.Logger.Debug("cloud foundry receiver starting") - _, tokenErr = tokenProvider.ProvideToken() if tokenErr != nil { cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("cloud foundry receiver failed to fetch initial token from UAA: %w", tokenErr))) return } - - envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID, cfr.telemetryType) - if err != nil { - cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("creating RLP gateway envelope stream: %w", err))) - return - } - - switch cfr.telemetryType { - case telemetryTypeMetrics: - cfr.streamMetrics(innerCtx, envelopeStream) - case telemetryTypeLogs: + if cfr.nextLogs != nil { + envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID, telemetryTypeLogs) + if err != nil { + cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("creating RLP gateway envelope log stream receiver: %w", err))) + return + } cfr.streamLogs(innerCtx, envelopeStream) - case telemetryTypeTraces: - // TODO - // cfr.streamTelemetry(innerCtx, envelopeStream) + } else if cfr.nextMetrics != nil { + envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID, telemetryTypeMetrics) + if err != nil { + cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("creating RLP gateway envelope metrics stream receiver: %w", err))) + return + } + cfr.streamMetrics(innerCtx, envelopeStream) } - cfr.settings.Logger.Debug("cloudfoundry metrics streamer stopped") }() @@ -193,7 +186,7 @@ func (cfr *cloudFoundryReceiver) streamMetrics( if libraryMetrics.Len() > 0 { obsCtx := cfr.obsrecv.StartMetricsOp(ctx) - err := cfr.nextMetricsConsumer.ConsumeMetrics(ctx, metrics) + err := cfr.nextMetrics.ConsumeMetrics(ctx, metrics) cfr.obsrecv.EndMetricsOp(obsCtx, dataFormat, metrics.DataPointCount(), err) } } @@ -224,7 +217,7 @@ func (cfr *cloudFoundryReceiver) streamLogs( if libraryLogs.Len() > 0 { obsCtx := cfr.obsrecv.StartLogsOp(ctx) - err := cfr.nextLogsConsumer.ConsumeLogs(ctx, logs) + err := cfr.nextLogs.ConsumeLogs(ctx, logs) cfr.obsrecv.EndLogsOp(obsCtx, dataFormat, logs.LogRecordCount(), err) } }