Skip to content

Commit

Permalink
Don't drop RUM unsampled transactions (#6669)
Browse files Browse the repository at this point in the history
* Don't drop RUM unsampled transactions

* Revert to using global monitoring registry

* Fix systemtest

* Update comment

* Fix systemtest even more

* Update changelog
  • Loading branch information
axw authored Nov 22, 2021
1 parent 0b3a57a commit 8e63576
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 103 deletions.
11 changes: 7 additions & 4 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import (
"github.com/elastic/apm-server/model/modelindexer"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sampling"
"github.com/elastic/apm-server/sourcemap"
)

Expand Down Expand Up @@ -533,9 +532,13 @@ func (s *serverRunner) run(listener net.Listener) error {
defer closeFinalBatchProcessor(s.backgroundContext)

batchProcessor = append(batchProcessor,
// The server always discards unsampled transactions. It is important that this
// is done just before calling the publisher to avoid affecting aggregations.
sampling.NewDiscardUnsampledBatchProcessor(),
// The server always drops non-RUM unsampled transactions. We store RUM unsampled
// transactions as they are needed by the User Experience app, which performs
// aggregations over dimensions that are not available in transaction metrics.
//
// It is important that this is done just before calling the publisher to
// avoid affecting aggregations.
modelprocessor.NewDropUnsampled(false /* don't drop RUM unsampled transactions*/),
modelprocessor.DroppedSpansStatsDiscarder{},
finalBatchProcessor,
)
Expand Down
2 changes: 1 addition & 1 deletion changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits]
- Removed unsupported libbeat `processors` configuration {pull}6474[6474]
- Removed `apm-server.aggregation.transactions.enabled` configuration option {pull}6495[6495]
- Removed `apm-server.aggregation.service_destinations.enabled` configuration option {pull}6503[6503]
- Removed `apm-server.sampling.keep_unsampled` configuration option {pull}6514[6514]
- Removed `apm-server.sampling.keep_unsampled` configuration option; non-RUM unsampled transactions are always dropped {pull}6514[6514] {pull}6669[6669]
- Removed `apm-server.jaeger` configuration options {pull}6560[6560]
- Removed `apm-server.instrumentation` configuration options in favor of `instrumentation` {pull}6560[6560]
- Removed `apm-server.rum.{allowed_service,event_rate}` configuration option in favor of `apm-server.auth.anonymous.{allow_service,rate_limit}` {pull}6560[6560]
Expand Down
26 changes: 17 additions & 9 deletions sampling/sampling.go → model/modelprocessor/dropunsampled.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,34 @@
// specific language governing permissions and limitations
// under the License.

package sampling
package modelprocessor

import (
"context"

"github.com/elastic/apm-server/model"
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/model"
)

var (
monitoringRegistry = monitoring.Default.NewRegistry("apm-server.sampling")
transactionsDroppedCounter = monitoring.NewInt(monitoringRegistry, "transactions_dropped")
)

// NewDiscardUnsampledBatchProcessor returns a model.BatchProcessor which
// discards unsampled transactions.
// NewDropUnsampled returns a model.BatchProcessor which drops unsampled transaction events,
// and counts them in a metric named `apm-server.sampling.transactions_dropped`.
//
// The returned model.BatchProcessor does not guarantee order preservation
// of events retained in the batch.
// If dropRUM is false, only non-RUM unsampled transaction events are dropped; otherwise all
// unsampled transaction events are dropped.
//
// TODO(axw) only discard non-RUM unsampled transactions.
func NewDiscardUnsampledBatchProcessor() model.BatchProcessor {
// This model.BatchProcessor does not guarantee order preservation of the remaining events.
func NewDropUnsampled(dropRUM bool) model.BatchProcessor {
return model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error {
events := *batch
for i := 0; i < len(events); {
event := events[i]
if event.Processor != model.TransactionProcessor || event.Transaction == nil || event.Transaction.Sampled {
if !shouldDropUnsampled(&event, dropRUM) {
i++
continue
}
Expand All @@ -56,3 +57,10 @@ func NewDiscardUnsampledBatchProcessor() model.BatchProcessor {
return nil
})
}

func shouldDropUnsampled(event *model.APMEvent, dropRUM bool) bool {
if event.Processor != model.TransactionProcessor || event.Transaction == nil || event.Transaction.Sampled {
return false
}
return dropRUM || !isRUMAgentName(event.Agent.Name)
}
95 changes: 95 additions & 0 deletions model/modelprocessor/dropunsampled_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package modelprocessor_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/beats/v7/libbeat/monitoring"
)

func TestNewDropUnsampled(t *testing.T) {
for _, dropRUM := range []bool{false, true} {
batchProcessor := modelprocessor.NewDropUnsampled(dropRUM)
counter := monitoring.Default.Get("apm-server.sampling.transactions_dropped").(*monitoring.Int)
counter.Set(0)

rumAgent := model.Agent{Name: "rum-js"}
t1 := &model.Transaction{ID: "t1", Sampled: false}
t2 := &model.Transaction{ID: "t2", Sampled: true}
t3 := &model.Transaction{ID: "t3", Sampled: false}
t4 := &model.Transaction{ID: "t4", Sampled: true}
t5 := &model.Transaction{ID: "t5", Sampled: false}

batch := model.Batch{{
Processor: model.TransactionProcessor,
Transaction: t1,
}, {
Processor: model.TransactionProcessor,
Transaction: t2,
}, {
Processor: model.ErrorProcessor,
// Transaction.Sampled should be disregarded, as
// Processor == ErrorProcessor, i.e. this is an
// error event with the transaction.sampled field.
Transaction: &model.Transaction{},
}, {
Processor: model.TransactionProcessor,
Transaction: t3,
}, {
Processor: model.TransactionProcessor,
Transaction: t4,
}, {
Agent: rumAgent,
Processor: model.TransactionProcessor,
Transaction: t5,
}}

err := batchProcessor.ProcessBatch(context.Background(), &batch)
assert.NoError(t, err)

var expectedTransactionsDropped int64 = 3
expectedRemainingBatch := model.Batch{
{Processor: model.TransactionProcessor, Transaction: t4},
{Processor: model.TransactionProcessor, Transaction: t2},
{Processor: model.ErrorProcessor, Transaction: &model.Transaction{}},
}
if !dropRUM {
expectedTransactionsDropped--
expectedRemainingBatch = append(expectedRemainingBatch, model.APMEvent{
Agent: rumAgent, Processor: model.TransactionProcessor, Transaction: t5,
})
}

// Note: this processor is not order-preserving.
assert.ElementsMatch(t, expectedRemainingBatch, batch)
expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["apm-server.sampling.transactions_dropped"] = expectedTransactionsDropped
snapshot := monitoring.CollectFlatSnapshot(
monitoring.Default,
monitoring.Full,
false, // expvar
)
assert.Equal(t, expectedMonitoring, snapshot)
}
}
80 changes: 0 additions & 80 deletions sampling/sampling_test.go

This file was deleted.

8 changes: 8 additions & 0 deletions systemtest/apmservertest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,11 @@ func (DefaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
m.Service.Node = nil
m.Service.Name = "systemtest"
}

// EventMetadataFilterFunc is a function type that implements EventMetadataFilter.
type EventMetadataFilterFunc func(*EventMetadata)

// FilterEventMetadata calls f(m).
func (f EventMetadataFilterFunc) FilterEventMetadata(m *EventMetadata) {
f(m)
}
Loading

0 comments on commit 8e63576

Please sign in to comment.