Skip to content

Commit

Permalink
feat: Introduce policy to received_bytes (#16056)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Introduce the `policy` label to the received_bytes metric family to be able to distinguish between different policies ingestion.
  • Loading branch information
DylanGuedes authored Feb 10, 2025
1 parent d9892b4 commit affedaf
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 154 deletions.
34 changes: 17 additions & 17 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,14 @@ const (
OTLPSeverityNumber = "severity_number"
)

func newPushStats() *Stats {
return &Stats{
LogLinesBytes: map[time.Duration]int64{},
StructuredMetadataBytes: map[time.Duration]int64{},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{},
}
}

func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, _ PolicyResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
stats := newPushStats()
func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, policyResolver PolicyResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
stats := NewPushStats()
otlpLogs, err := extractLogs(r, stats)
if err != nil {
return nil, nil, err
}

req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger)
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger, policyResolver)
return req, stats, nil
}

Expand Down Expand Up @@ -101,7 +93,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return req.Logs(), nil
}

func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger) *logproto.PushRequest {
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger, policyForResolver PolicyResolver) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
Expand Down Expand Up @@ -196,8 +188,13 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten

resourceAttributesAsStructuredMetadataSize := loki_util.StructuredMetadataSize(resourceAttributesAsStructuredMetadata)
retentionPeriodForUser := tenantsRetention.RetentionPeriodFor(userID, lbs)
policy := policyForResolver(userID, lbs)

stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize)
if _, ok := stats.StructuredMetadataBytes[policy]; !ok {
stats.StructuredMetadataBytes[policy] = make(map[time.Duration]int64)
}

stats.StructuredMetadataBytes[policy][retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize)
totalBytesReceived += int64(resourceAttributesAsStructuredMetadataSize)

stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], resourceAttributesAsStructuredMetadata...)
Expand Down Expand Up @@ -250,7 +247,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
}

scopeAttributesAsStructuredMetadataSize := loki_util.StructuredMetadataSize(scopeAttributesAsStructuredMetadata)
stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(scopeAttributesAsStructuredMetadataSize)
stats.StructuredMetadataBytes[policy][retentionPeriodForUser] += int64(scopeAttributesAsStructuredMetadataSize)
totalBytesReceived += int64(scopeAttributesAsStructuredMetadataSize)

stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], scopeAttributesAsStructuredMetadata...)
Expand All @@ -274,12 +271,15 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
pushRequestsByStream[labelsStr] = stream

metadataSize := int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize)
stats.StructuredMetadataBytes[retentionPeriodForUser] += metadataSize
stats.LogLinesBytes[retentionPeriodForUser] += int64(len(entry.Line))
stats.StructuredMetadataBytes[policy][retentionPeriodForUser] += metadataSize
if _, ok := stats.LogLinesBytes[policy]; !ok {
stats.LogLinesBytes[policy] = make(map[time.Duration]int64)
}
stats.LogLinesBytes[policy][retentionPeriodForUser] += int64(len(entry.Line))
totalBytesReceived += metadataSize
totalBytesReceived += int64(len(entry.Line))

stats.NumLines++
stats.PolicyNumLines[policy]++
if entry.Timestamp.After(stats.MostRecentEntryTimestamp) {
stats.MostRecentEntryTimestamp = entry.Timestamp
}
Expand Down
121 changes: 84 additions & 37 deletions pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
return plog.NewLogs()
},
expectedPushRequest: logproto.PushRequest{},
expectedStats: *newPushStats(),
expectedStats: *NewPushStats(),
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
},
{
Expand All @@ -64,7 +64,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
return ld
},
expectedPushRequest: logproto.PushRequest{},
expectedStats: *newPushStats(),
expectedStats: *NewPushStats(),
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
},
{
Expand Down Expand Up @@ -92,12 +92,18 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 1,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 9,
PolicyNumLines: map[string]int64{
"service-1-policy": 1,
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 0,
LogLinesBytes: PolicyWithRetentionWithBytes{
"service-1-policy": {
time.Hour: 9,
},
},
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"service-1-policy": {
time.Hour: 0,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: nil,
Expand Down Expand Up @@ -131,12 +137,18 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 1,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 9,
PolicyNumLines: map[string]int64{
"others": 1,
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 0,
LogLinesBytes: PolicyWithRetentionWithBytes{
"others": {
time.Hour: 9,
},
},
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"others": {
time.Hour: 0,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: nil,
Expand Down Expand Up @@ -170,12 +182,18 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 1,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 9,
PolicyNumLines: map[string]int64{
"others": 1,
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 0,
LogLinesBytes: PolicyWithRetentionWithBytes{
"others": {
time.Hour: 9,
},
},
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"others": {
time.Hour: 0,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: nil,
Expand Down Expand Up @@ -248,12 +266,18 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 2,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 26,
PolicyNumLines: map[string]int64{
"service-1-policy": 2,
},
LogLinesBytes: PolicyWithRetentionWithBytes{
"service-1-policy": {
time.Hour: 26,
},
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 37,
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"service-1-policy": {
time.Hour: 37,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: []push.LabelAdapter{
Expand Down Expand Up @@ -339,12 +363,18 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 2,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 26,
PolicyNumLines: map[string]int64{
"service-1-policy": 2,
},
LogLinesBytes: PolicyWithRetentionWithBytes{
"service-1-policy": {
time.Hour: 26,
},
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 97,
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"service-1-policy": {
time.Hour: 97,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: []push.LabelAdapter{
Expand Down Expand Up @@ -490,12 +520,18 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 2,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 26,
PolicyNumLines: map[string]int64{
"service-1-policy": 2,
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 113,
LogLinesBytes: PolicyWithRetentionWithBytes{
"service-1-policy": {
time.Hour: 26,
},
},
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"service-1-policy": {
time.Hour: 113,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: []push.LabelAdapter{
Expand All @@ -511,8 +547,9 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
stats := newPushStats()
stats := NewPushStats()
tracker := NewMockTracker()

pushReq := otlpToLokiPushRequest(
context.Background(),
tc.generateLogs(),
Expand All @@ -524,16 +561,26 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
stats,
false,
log.NewNopLogger(),
func(_ string, lbs labels.Labels) string {
if lbs.Get("service_name") == "service-1" {
return "service-1-policy"
}
return "others"
},
)
require.Equal(t, tc.expectedPushRequest, *pushReq)
require.Equal(t, tc.expectedStats, *stats)

totalBytes := 0.0
for _, b := range stats.LogLinesBytes {
totalBytes += float64(b)
for _, policyMapping := range stats.LogLinesBytes {
for _, b := range policyMapping {
totalBytes += float64(b)
}
}
for _, b := range stats.StructuredMetadataBytes {
totalBytes += float64(b)
for _, policyMapping := range stats.StructuredMetadataBytes {
for _, b := range policyMapping {
totalBytes += float64(b)
}
}
require.Equal(t, totalBytes, tracker.Total(), "Total tracked bytes must equal total bytes of the stats.")
})
Expand Down
Loading

0 comments on commit affedaf

Please sign in to comment.