Skip to content

Commit

Permalink
[receiver/cloudfoundryreceiver] Rename nextConsumers fields and remov…
Browse files Browse the repository at this point in the history
…e telemetryType from cloudFoundryReceiver struct

Co-authored-by: Cem Deniz Kabakci <cem.kabakci@springer.com>
  • Loading branch information
jriguera and CemDK committed May 23, 2024
1 parent af160a4 commit 928f6a4
Showing 1 changed file with 26 additions and 33 deletions.
59 changes: 26 additions & 33 deletions receiver/cloudfoundryreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ const (

// newCloudFoundryReceiver implements the receiver.Metrics for Cloud Foundry protocol.
type cloudFoundryReceiver struct {
settings component.TelemetrySettings
cancel context.CancelFunc
config Config
nextMetricsConsumer consumer.Metrics
nextLogsConsumer consumer.Logs
// nextTracesConsumer consumer.Traces
settings component.TelemetrySettings
cancel context.CancelFunc
config Config
nextMetrics consumer.Metrics
nextLogs consumer.Logs
obsrecv *receiverhelper.ObsReport
telemetryType telemetryType
goroutines sync.WaitGroup
receiverStartTime time.Time
}
Expand All @@ -66,12 +64,11 @@ func newCloudFoundryMetricsReceiver(
}

result := &cloudFoundryReceiver{
settings: settings.TelemetrySettings,
config: config,
nextMetricsConsumer: nextConsumer,
telemetryType: telemetryTypeMetrics,
obsrecv: obsrecv,
receiverStartTime: time.Now(),
settings: settings.TelemetrySettings,
config: config,
nextMetrics: nextConsumer,
obsrecv: obsrecv,
receiverStartTime: time.Now(),
}
return result, nil
}
Expand All @@ -93,8 +90,7 @@ func newCloudFoundryLogsReceiver(
result := &cloudFoundryReceiver{
settings: settings.TelemetrySettings,
config: config,
nextLogsConsumer: nextConsumer,
telemetryType: telemetryTypeLogs,
nextLogs: nextConsumer,
obsrecv: obsrecv,
receiverStartTime: time.Now(),
}
Expand Down Expand Up @@ -126,29 +122,26 @@ func (cfr *cloudFoundryReceiver) Start(ctx context.Context, host component.Host)
go func() {
defer cfr.goroutines.Done()
cfr.settings.Logger.Debug("cloud foundry receiver starting")

_, tokenErr = tokenProvider.ProvideToken()
if tokenErr != nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("cloud foundry receiver failed to fetch initial token from UAA: %w", tokenErr)))
return
}

envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID, cfr.telemetryType)
if err != nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("creating RLP gateway envelope stream: %w", err)))
return
}

switch cfr.telemetryType {
case telemetryTypeMetrics:
cfr.streamMetrics(innerCtx, envelopeStream)
case telemetryTypeLogs:
if cfr.nextLogs != nil {
envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID, telemetryTypeLogs)
if err != nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("creating RLP gateway envelope log stream receiver: %w", err)))
return
}
cfr.streamLogs(innerCtx, envelopeStream)
case telemetryTypeTraces:
// TODO
// cfr.streamTelemetry(innerCtx, envelopeStream)
} else if cfr.nextMetrics != nil {
envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID, telemetryTypeMetrics)
if err != nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("creating RLP gateway envelope metrics stream receiver: %w", err)))
return
}
cfr.streamMetrics(innerCtx, envelopeStream)
}

cfr.settings.Logger.Debug("cloudfoundry metrics streamer stopped")
}()

Expand Down Expand Up @@ -193,7 +186,7 @@ func (cfr *cloudFoundryReceiver) streamMetrics(

if libraryMetrics.Len() > 0 {
obsCtx := cfr.obsrecv.StartMetricsOp(ctx)
err := cfr.nextMetricsConsumer.ConsumeMetrics(ctx, metrics)
err := cfr.nextMetrics.ConsumeMetrics(ctx, metrics)
cfr.obsrecv.EndMetricsOp(obsCtx, dataFormat, metrics.DataPointCount(), err)
}
}
Expand Down Expand Up @@ -224,7 +217,7 @@ func (cfr *cloudFoundryReceiver) streamLogs(

if libraryLogs.Len() > 0 {
obsCtx := cfr.obsrecv.StartLogsOp(ctx)
err := cfr.nextLogsConsumer.ConsumeLogs(ctx, logs)
err := cfr.nextLogs.ConsumeLogs(ctx, logs)
cfr.obsrecv.EndLogsOp(obsCtx, dataFormat, logs.LogRecordCount(), err)
}
}
Expand Down

0 comments on commit 928f6a4

Please sign in to comment.