diff --git a/exporter/opsrampotlpexporter/config.go b/exporter/opsrampotlpexporter/config.go index fb936168bf92..8b1a9c08f409 100644 --- a/exporter/opsrampotlpexporter/config.go +++ b/exporter/opsrampotlpexporter/config.go @@ -17,6 +17,7 @@ package opsrampotlpexporter // import "go.opentelemetry.io/collector/exporter/ot import ( "errors" "fmt" + "time" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configgrpc" @@ -69,6 +70,7 @@ type Config struct { Security SecuritySettings `mapstructure:"security"` configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. Masking []MaskingSettings `mapstructure:"masking"` + ExpirationSkip time.Duration `mapstructure"expiration_skip"` } var _ config.Exporter = (*Config)(nil) diff --git a/exporter/opsrampotlpexporter/otlp.go b/exporter/opsrampotlpexporter/otlp.go index bff3913838eb..cf59bc02c8eb 100644 --- a/exporter/opsrampotlpexporter/otlp.go +++ b/exporter/opsrampotlpexporter/otlp.go @@ -174,6 +174,9 @@ func (e *exporter) pushLogs(ctx context.Context, ld plog.Logs) error { if e.config.Masking != nil { e.applyMasking(ld) } + if e.config.ExpirationSkip != 0 { + e.skipExpired(ld) + } req := plogotlp.NewRequestFromLogs(ld) @@ -306,3 +309,20 @@ func (e *exporter) applyMasking(ld plog.Logs) { } } + +func (e *exporter) skipExpired(ld plog.Logs) { + for i := 0; i < ld.ResourceLogs().Len(); i++ { + resLogs := ld.ResourceLogs().At(i) + + for k := 0; k < resLogs.ScopeLogs().Len(); k++ { + resLogs.ScopeLogs().At(k).LogRecords().RemoveIf(func(el plog.LogRecord) bool { + fmt.Println(el.Timestamp().AsTime().String(), time.Now().Add(-e.config.ExpirationSkip).String()) + if el.Timestamp().AsTime().Before(time.Now().Add(-e.config.ExpirationSkip)) { + return true + } + return false + }) + + } + } +} diff --git a/exporter/opsrampotlpexporter/otlp_test.go b/exporter/opsrampotlpexporter/otlp_test.go index f3f3ab6a7f7b..cb8dce116f69 100644 --- a/exporter/opsrampotlpexporter/otlp_test.go +++ b/exporter/opsrampotlpexporter/otlp_test.go @@ -17,7 +17,6 @@ package opsrampotlpexporter import ( "context" "fmt" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" "net" "path/filepath" "regexp" @@ -26,6 +25,9 @@ import ( "testing" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" + "go.opentelemetry.io/collector/pdata/pcommon" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -727,3 +729,59 @@ func TestGetAuthToken(t *testing.T) { assert.Nil(t, err) fmt.Println(token) } + +func TestSkipExpiredLogs(t *testing.T) { + + half := 30 * time.Minute + tests := []struct { + name string + expiration time.Duration + expected int + }{ + { + expiration: 1*time.Hour + half, + expected: 2, + }, + { + expiration: 2*time.Hour + half, + expected: 3, + }, + { + expiration: 3*time.Hour + half, + expected: 4, + }, + { + expiration: 5*time.Hour + half, + expected: 6, + }, + { + expiration: 9*time.Hour + half, + expected: 10, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ld := generateTestEntries() + e := exporter{config: &Config{ExpirationSkip: tt.expiration}} + e.skipExpired(ld) + assert.Equal(t, ld.LogRecordCount(), tt.expected) + }) + } + +} + +func generateTestEntries() plog.Logs { + ld := plog.NewLogs() + rl0 := ld.ResourceLogs().AppendEmpty() + sc := rl0.ScopeLogs().AppendEmpty() + for i := 0; i < 10; i++ { + el := sc.LogRecords().AppendEmpty() + duration := time.Hour * time.Duration(i) + el.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-duration))) + el.Body().SetStringVal(fmt.Sprintf("This is entry # %q", i)) + } + + return ld + +}