Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish Resource Entity Relationships from the Collector #169

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distribution/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/briandowns/spinner v1.23.0 // indirect
github.com/cardinalhq/cardinalhq-otel-collector/internal v0.0.0 // indirect
github.com/cardinalhq/oteltools v0.8.1 // indirect
github.com/cardinalhq/oteltools v0.9.0 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions distribution/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ github.com/briandowns/spinner v1.23.0 h1:alDF2guRWqa/FOZZYWjlMIx2L6H0wyewPxo/CH4
github.com/briandowns/spinner v1.23.0/go.mod h1:rPG4gmXeN3wQV/TsAY4w8lPdIM6RX3yqeBQJSrbXjuE=
github.com/cardinalhq/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver v0.0.0-20241218023339-09be78c4a419 h1:jd83blQ8zskkEghG592guA2+DU5a5xqBlWMvQW4UQFc=
github.com/cardinalhq/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver v0.0.0-20241218023339-09be78c4a419/go.mod h1:EHgHBruNR8bx3g/wwr0oLHd9z0QjKTu9DjoMc2xYBWU=
github.com/cardinalhq/oteltools v0.8.1 h1:EP0B6DwtdCIJfpupeo2PIuFW5SoOrFUC+hT32z8eOy0=
github.com/cardinalhq/oteltools v0.8.1/go.mod h1:55norMwFppSyXrsaCTmltG2phmyzkum2w28xJ6z4PjE=
github.com/cardinalhq/oteltools v0.9.0 h1:KdizpcdVFS7sRhhXhbZVx/2y6px6qgcIowB4GMVWx1o=
github.com/cardinalhq/oteltools v0.9.0/go.mod h1:55norMwFppSyXrsaCTmltG2phmyzkum2w28xJ6z4PjE=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
Expand Down
2 changes: 1 addition & 1 deletion processor/chqstatsprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.23.3

require (
github.com/cardinalhq/cardinalhq-otel-collector/extension/chqconfigextension v0.0.0
github.com/cardinalhq/oteltools v0.8.1
github.com/cardinalhq/oteltools v0.9.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-multierror v1.1.1
github.com/stretchr/testify v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions processor/chqstatsprocessor/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ github.com/apache/datasketches-go v0.0.0-20240723070244-57d8af6c2e71 h1:YfjfaPjG
github.com/apache/datasketches-go v0.0.0-20240723070244-57d8af6c2e71/go.mod h1:d6gftfreCcQJZaXAYyyYkHDwbcrGo63PjgWEYRdHZYw=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cardinalhq/oteltools v0.8.1 h1:EP0B6DwtdCIJfpupeo2PIuFW5SoOrFUC+hT32z8eOy0=
github.com/cardinalhq/oteltools v0.8.1/go.mod h1:55norMwFppSyXrsaCTmltG2phmyzkum2w28xJ6z4PjE=
github.com/cardinalhq/oteltools v0.9.0 h1:KdizpcdVFS7sRhhXhbZVx/2y6px6qgcIowB4GMVWx1o=
github.com/cardinalhq/oteltools v0.9.0/go.mod h1:55norMwFppSyXrsaCTmltG2phmyzkum2w28xJ6z4PjE=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
Expand Down
7 changes: 6 additions & 1 deletion processor/chqstatsprocessor/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,16 @@ func (e *statsProc) ConsumeLogs(ctx context.Context, ld plog.Logs) (plog.Logs, e

for i := 0; i < ld.ResourceLogs().Len(); i++ {
rl := ld.ResourceLogs().At(i)
serviceName := getServiceName(rl.Resource().Attributes())
resourceAttributes := rl.Resource().Attributes()
globalEntityMap := e.resourceEntityCache.ProvisionResourceAttributes(resourceAttributes)

serviceName := getServiceName(resourceAttributes)
for j := 0; j < rl.ScopeLogs().Len(); j++ {
sl := rl.ScopeLogs().At(j)
for k := 0; k < sl.LogRecords().Len(); k++ {
lr := sl.LogRecords().At(k)
fp := getFingerprint(lr.Attributes())
e.resourceEntityCache.ProvisionRecordAttributes(globalEntityMap, lr.Attributes())
if err := e.recordLog(now, ee, serviceName, fp, rl, sl, lr); err != nil {
e.logger.Error("Failed to record log", zap.Error(err))
}
Expand Down Expand Up @@ -152,6 +156,7 @@ func (e *statsProc) sendLogStats(statsList []*chqpb.EventStats) {
}
stat.Exemplar = exemplarBytes.([]byte)
}

wrapper := &chqpb.EventStatsReport{
SubmittedAt: time.Now().UnixMilli(),
Stats: statsList}
Expand Down
15 changes: 9 additions & 6 deletions processor/chqstatsprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"errors"
"fmt"
"github.com/cardinalhq/oteltools/pkg/graph"
"io"
"net/http"
"strconv"
Expand Down Expand Up @@ -49,6 +50,7 @@ func (e *statsProc) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) (pme
rm := md.ResourceMetrics().At(i)
serviceName := getServiceName(rm.Resource().Attributes())
rattr := rm.Resource().Attributes()
globalEntityMap := e.resourceEntityCache.ProvisionResourceAttributes(rattr)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
ilm := rm.ScopeMetrics().At(j)
sattr := ilm.Scope().Attributes()
Expand All @@ -64,27 +66,27 @@ func (e *statsProc) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) (pme

for l := 0; l < m.Gauge().DataPoints().Len(); l++ {
dp := m.Gauge().DataPoints().At(l)
e.processDatapoint(ee, metricName, pmetric.MetricTypeGauge.String(), serviceName, extra, rattr, sattr, dp.Attributes())
e.processDatapoint(ee, metricName, pmetric.MetricTypeGauge.String(), serviceName, extra, rattr, sattr, dp.Attributes(), globalEntityMap)
}
case pmetric.MetricTypeSum:
for l := 0; l < m.Sum().DataPoints().Len(); l++ {
dp := m.Sum().DataPoints().At(l)
e.processDatapoint(ee, metricName, pmetric.MetricTypeSum.String(), serviceName, extra, rattr, sattr, dp.Attributes())
e.processDatapoint(ee, metricName, pmetric.MetricTypeSum.String(), serviceName, extra, rattr, sattr, dp.Attributes(), globalEntityMap)
}
case pmetric.MetricTypeHistogram:
for l := 0; l < m.Histogram().DataPoints().Len(); l++ {
dp := m.Histogram().DataPoints().At(l)
e.processDatapoint(ee, metricName, pmetric.MetricTypeHistogram.String(), serviceName, extra, rattr, sattr, dp.Attributes())
e.processDatapoint(ee, metricName, pmetric.MetricTypeHistogram.String(), serviceName, extra, rattr, sattr, dp.Attributes(), globalEntityMap)
}
case pmetric.MetricTypeSummary:
for l := 0; l < m.Summary().DataPoints().Len(); l++ {
dp := m.Summary().DataPoints().At(l)
e.processDatapoint(ee, metricName, pmetric.MetricTypeSummary.String(), serviceName, extra, rattr, sattr, dp.Attributes())
e.processDatapoint(ee, metricName, pmetric.MetricTypeSummary.String(), serviceName, extra, rattr, sattr, dp.Attributes(), globalEntityMap)
}
case pmetric.MetricTypeExponentialHistogram:
for l := 0; l < m.ExponentialHistogram().DataPoints().Len(); l++ {
dp := m.ExponentialHistogram().DataPoints().At(l)
e.processDatapoint(ee, metricName, pmetric.MetricTypeExponentialHistogram.String(), serviceName, extra, rattr, sattr, dp.Attributes())
e.processDatapoint(ee, metricName, pmetric.MetricTypeExponentialHistogram.String(), serviceName, extra, rattr, sattr, dp.Attributes(), globalEntityMap)
}
}
}
Expand All @@ -98,7 +100,8 @@ func (e *statsProc) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) (pme
return md, nil
}

func (e *statsProc) processDatapoint(environment translate.Environment, metricName, metricType, serviceName string, extra map[string]string, rattr, sattr, dattr pcommon.Map) {
func (e *statsProc) processDatapoint(environment translate.Environment, metricName, metricType, serviceName string, extra map[string]string, rattr, sattr, dattr pcommon.Map, globalEntityMap map[string]*graph.ResourceEntity) {
e.resourceEntityCache.ProvisionRecordAttributes(globalEntityMap, dattr)
tid := translate.CalculateTID(extra, rattr, sattr, dattr, "metric", environment)
if err := e.recordDatapoint(environment, metricName, metricType, serviceName, tid, rattr, sattr, dattr); err != nil {
e.logger.Error("Failed to record datapoint", zap.Error(err))
Expand Down
113 changes: 102 additions & 11 deletions processor/chqstatsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
package chqstatsprocessor

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/cardinalhq/oteltools/pkg/graph"
"hash/fnv"
"io"
"net/http"
"os"
"sync/atomic"
Expand Down Expand Up @@ -86,6 +91,8 @@ type statsProc struct {
traceExemplars *LRUCache
metricExemplars *LRUCache

resourceEntityCache *graph.ResourceEntityCache

jsonMarshaller otelJsonMarshaller

logStatsEnrichments atomic.Pointer[[]ottl.StatsEnrichment]
Expand All @@ -101,17 +108,18 @@ type statsProc struct {

func newStatsProc(config *Config, ttype string, set processor.Settings) (*statsProc, error) {
dog := &statsProc{
id: set.ID,
ttype: ttype,
config: config,
httpClientSettings: config.ClientConfig,
telemetrySettings: set.TelemetrySettings,
jsonMarshaller: newMarshaller(),
logExemplars: NewLRUCache(1000, 5*time.Minute),
traceExemplars: NewLRUCache(1000, 5*time.Minute),
metricExemplars: NewLRUCache(1000, 5*time.Minute),
logger: set.Logger,
podName: os.Getenv("POD_NAME"),
id: set.ID,
ttype: ttype,
config: config,
httpClientSettings: config.ClientConfig,
telemetrySettings: set.TelemetrySettings,
jsonMarshaller: newMarshaller(),
logExemplars: NewLRUCache(1000, 5*time.Minute),
traceExemplars: NewLRUCache(1000, 5*time.Minute),
metricExemplars: NewLRUCache(1000, 5*time.Minute),
resourceEntityCache: graph.NewResourceEntityCache(),
logger: set.Logger,
podName: os.Getenv("POD_NAME"),
}

//if os.Getenv("ENABLE_METRIC_METRICS") == "true" {
Expand Down Expand Up @@ -212,6 +220,19 @@ func (e *statsProc) Start(ctx context.Context, host component.Host) error {

e.idsFromEnv = e.config.IDSource == "env"

ticker := time.NewTicker(5 * time.Minute)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
e.publishResourceEntities(ctx)
}
}
}()

return nil
}

Expand Down Expand Up @@ -291,3 +312,73 @@ func hashString(s string) int64 {
func (e *statsProc) toExemplarKey(serviceName string, fingerprint int64) int64 {
return hashString(fmt.Sprintf("%s-%d", serviceName, fingerprint))
}

func (e *statsProc) publishResourceEntities(ctx context.Context) {
allEntitiesMap := e.resourceEntityCache.GetAllEntities()
if len(allEntitiesMap) == 0 {
return
}
allEntitiesSlice := make([]*graph.ResourceEntity, 0, len(allEntitiesMap))
for _, entity := range allEntitiesMap {
allEntitiesSlice = append(allEntitiesSlice, entity)
}

const batchSize = 100
total := len(allEntitiesSlice)
for start := 0; start < total; start += batchSize {
end := start + batchSize
if end > total {
end = total
}
batch := allEntitiesSlice[start:end]
if err := e.postEntityRelationships(ctx, batch); err != nil {
e.logger.Error("Failed to send resource entities", zap.Error(err))
}
}
}

func (e *statsProc) postEntityRelationships(ctx context.Context, entities []*graph.ResourceEntity) error {
b, err := json.Marshal(entities)
if err != nil {
return err
}

var compressedData bytes.Buffer
gzipWriter := gzip.NewWriter(&compressedData)
if _, err = gzipWriter.Write(b); err != nil {
return err
}
if err = gzipWriter.Close(); err != nil {
return err
}

endpoint := e.config.Endpoint + "/api/v1/entityRelationships"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, &compressedData)
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")

resp, err := e.httpClient.Do(req)
if err != nil {
return err
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
e.logger.Error("Failed to close response body", zap.Error(closeErr))
}
}()

body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
e.logger.Error("Failed to send resource entities",
zap.Int("status", resp.StatusCode),
zap.String("body", string(body)),
)
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

return nil
}
6 changes: 5 additions & 1 deletion processor/chqstatsprocessor/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ func (e *statsProc) ConsumeTraces(ctx context.Context, td ptrace.Traces) (ptrace

for i := 0; i < td.ResourceSpans().Len(); i++ {
rs := td.ResourceSpans().At(i)
serviceName := getServiceName(rs.Resource().Attributes())
resourceAttributes := rs.Resource().Attributes()
globalEntityMap := e.resourceEntityCache.ProvisionResourceAttributes(resourceAttributes)
serviceName := getServiceName(resourceAttributes)
for j := 0; j < rs.ScopeSpans().Len(); j++ {
iss := rs.ScopeSpans().At(j)
for k := 0; k < iss.Spans().Len(); k++ {
sr := iss.Spans().At(k)
e.resourceEntityCache.ProvisionRecordAttributes(globalEntityMap, sr.Attributes())
isSlow := false
if isslowValue, found := sr.Attributes().Get(translate.CardinalFieldSpanIsSlow); found {
isSlow = isslowValue.Bool()
Expand Down Expand Up @@ -144,6 +147,7 @@ func (e *statsProc) sendSpanStats(statsList []*chqpb.EventStats) {
}
stat.Exemplar = exemplarBytes.([]byte)
}

wrapper := &chqpb.EventStatsReport{
SubmittedAt: time.Now().UnixMilli(),
Stats: statsList}
Expand Down
3 changes: 0 additions & 3 deletions processor/fingerprintprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ type fingerprintProcessor struct {
// for logs
logFingerprinter fingerprinter.Fingerprinter

// for spans
traceFingerprinter fingerprinter.Fingerprinter
estimators map[uint64]*SlidingEstimatorStat
estimatorWindowSize int
estimatorInterval int64
Expand All @@ -68,7 +66,6 @@ func newProcessor(config *Config, ttype string, set processor.Settings) (*finger
dog.logFingerprinter = fingerprinter.NewFingerprinter(fingerprinter.WithMaxTokens(30))

case "traces":
dog.traceFingerprinter = fingerprinter.NewFingerprinter(fingerprinter.WithMaxTokens(30))
dog.estimators = make(map[uint64]*SlidingEstimatorStat)
dog.estimatorWindowSize = config.TracesConfig.EstimatorWindowSize
dog.estimatorInterval = config.TracesConfig.EstimatorInterval
Expand Down
Loading
Loading