Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EMF exporter performance: send EMF logs in batches #2572

Merged
merged 1 commit into from
Mar 11, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 45 additions & 21 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func NewEmfExporter(
config configmodels.Exporter,
params component.ExporterCreateParams,
) (component.MetricsExporter, error) {

exp, err := New(config, params)
if err != nil {
return nil, err
Expand All @@ -105,12 +104,23 @@ func NewEmfExporter(
}

func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (droppedTimeSeries int, err error) {
rms := md.ResourceMetrics()
labels := map[string]string{}
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
am := rm.Resource().Attributes()
if am.Len() > 0 {
am.ForEach(func(k string, v pdata.AttributeValue) {
labels[k] = v.StringVal()
})
}
Comment on lines +112 to +116
Copy link
Member

@mxiamxia mxiamxia Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: why can't we directly take the labels from groupedMetrics.Labels?

Copy link
Contributor Author

@bjrara bjrara Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The labels only serves for the purpose of logging. It should be simple and timely. I separated translateOTelToGroupedMetric and label extraction in the latest commit. Another concern is since groupMetrics.Labels contains extra labels from datapoints, they would blow up the info log.

}
emf.logger.Info("Start processing resource metrics", zap.Any("labels", labels))

groupedMetrics := make(map[interface{}]*GroupedMetric)
expConfig := emf.config.(*Config)
defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)

rms := md.ResourceMetrics()

for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
emf.metricTranslator.translateOTelToGroupedMetric(&rm, groupedMetrics, expConfig)
Expand All @@ -133,14 +143,23 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (dr
err = wrapErrorIfBadRequest(&returnError)
return
}
returnError = pusher.ForceFlush()
if returnError != nil {
err = wrapErrorIfBadRequest(&returnError)
return
}
}

for _, pusher := range emf.listPushers() {
returnError := pusher.ForceFlush()
if returnError != nil {
Comment on lines +149 to +151
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching the performance issue here.
Could you combine this code block with the code in func (emf *emfExporter) Shutdown()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

//TODO now we only have one pusher, so it's ok to return after first error occurred
err = wrapErrorIfBadRequest(&returnError)
if err != nil {
emf.logger.Error("Error force flushing logs. Skipping to next pusher.", zap.Error(err))
}
return
}
}

emf.logger.Info("Finish processing resource metrics", zap.Any("labels", labels))

return
}

Expand All @@ -163,6 +182,19 @@ func (emf *emfExporter) getPusher(logGroup, logStream string) Pusher {
return pusher
}

func (emf *emfExporter) listPushers() []Pusher {
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()

pushers := []Pusher{}
for _, pusherMap := range emf.groupStreamToPusherMap {
for _, pusher := range pusherMap {
pushers = append(pushers, pusher)
}
}
return pushers
}

func (emf *emfExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
exporterCtx := obsreport.ExporterContext(ctx, "emf.exporterFullName")

Expand All @@ -172,20 +204,12 @@ func (emf *emfExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) er

// Shutdown stops the exporter and is invoked during shutdown.
func (emf *emfExporter) Shutdown(ctx context.Context) error {
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()

var err error
for _, streamToPusherMap := range emf.groupStreamToPusherMap {
for _, pusher := range streamToPusherMap {
if pusher != nil {
returnError := pusher.ForceFlush()
if returnError != nil {
err = wrapErrorIfBadRequest(&returnError)
}
if err != nil {
emf.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next pusher.", zap.Error(err))
}
for _, pusher := range emf.listPushers() {
returnError := pusher.ForceFlush()
if returnError != nil {
err := wrapErrorIfBadRequest(&returnError)
if err != nil {
emf.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next pusher.", zap.Error(err))
}
}
}
Expand Down