From ce87fd854f882209ece6830fe340c2e534b60e1b Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 31 Jul 2024 20:41:38 -0600 Subject: [PATCH] feat: aggregate byte and count metrics * aggregate in the pattern ingester and push back into Loki as a stream --- cmd/loki/loki-local-config.yaml | 2 +- pkg/distributor/distributor.go | 16 ++ pkg/distributor/distributor_test.go | 130 +++++++++-- pkg/distributor/validator.go | 6 +- pkg/loghttp/push/push.go | 74 +++--- pkg/loki/loki.go | 4 +- pkg/loki/modules.go | 8 +- pkg/pattern/aggregation/config.go | 107 +++++++++ pkg/pattern/aggregation/metrics.go | 28 +++ pkg/pattern/aggregation/push.go | 309 ++++++++++++++++++++++++ pkg/pattern/aggregation/push_test.go | 336 +++++++++++++++++++++++++++ pkg/pattern/flush.go | 19 ++ pkg/pattern/ingester.go | 61 ++++- pkg/pattern/instance.go | 199 ++++++++++++++-- pkg/pattern/tee.go | 53 ++++- 15 files changed, 1256 insertions(+), 96 deletions(-) create mode 100644 pkg/pattern/aggregation/config.go create mode 100644 pkg/pattern/aggregation/metrics.go create mode 100644 pkg/pattern/aggregation/push.go create mode 100644 pkg/pattern/aggregation/push_test.go diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index ade3febc5e27e..38efa3f6bf6e7 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -42,7 +42,7 @@ pattern_ingester: enabled: true metric_aggregation: enabled: true - log_push_observations: true + loki_address: localhost:3100 ruler: alertmanager_url: http://localhost:9093 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index cd9524a9168ec..f2e42bf71e22c 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -787,6 +787,22 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, return nil, "", 0, err } + // We do not want to count service_name added by us in the stream limit so adding it after validating original labels. + // We also don't want to add service name to aggregated metrics. + if !ls.Has(push.LabelServiceName) && !ls.Has(push.AggregatedMetricLabel) && + len(vContext.discoverServiceName) > 0 { + serviceName := push.ServiceUnknown + for _, labelName := range vContext.discoverServiceName { + if labelVal := ls.Get(labelName); labelVal != "" { + serviceName = labelVal + break + } + } + + ls = labels.NewBuilder(ls).Set(push.LabelServiceName, serviceName).Labels() + stream.Labels = ls.String() + } + lsHash := ls.Hash() d.labelCache.Add(key, labelData{ls, lsHash}) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index c21b1e2561cd2..ff41e498a13c4 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -31,10 +31,10 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/pkg/push" + loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/client" - loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/runtime" @@ -150,6 +150,7 @@ func Test_IncrementTimestamp(t *testing.T) { defaultLimits := &validation.Limits{} flagext.DefaultValues(defaultLimits) + now := time.Now() defaultLimits.DiscoverLogLevels = false tests := map[string]struct { @@ -397,6 +398,34 @@ func Test_IncrementTimestamp(t *testing.T) { }, }, }, + "default limit adding service_name label": { + limits: defaultLimits, + push: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {Timestamp: now.Add(-2 * time.Second), Line: "hey1"}, + {Timestamp: now.Add(-time.Second), Line: "hey2"}, + {Timestamp: now, Line: "hey3"}, + }, + }, + }, + }, + expectedPush: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\", service_name=\"foo\"}", + Hash: 0x86ca305b6d86e8b0, + Entries: []logproto.Entry{ + {Timestamp: now.Add(-2 * time.Second), Line: "hey1"}, + {Timestamp: now.Add(-time.Second), Line: "hey2"}, + {Timestamp: now, Line: "hey3"}, + }, + }, + }, + }, + }, } for testName, testData := range tests { @@ -519,46 +548,40 @@ func Test_SortLabelsOnPush(t *testing.T) { topVal := ingester.Peek() require.Equal(t, `{a="b", buzz="f", service_name="foo"}`, topVal.Streams[0].Labels) }) -} -func Test_TruncateLogLines(t *testing.T) { - setup := func() (*validation.Limits, *mockIngester) { + t.Run("with service_name added during ingestion", func(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) - - limits.MaxLineSize = 5 - limits.MaxLineSizeTruncate = true - return limits, &mockIngester{} - } - - t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) { - limits, ingester := setup() + ingester := &mockIngester{} distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) - _, err := distributors[0].Push(ctx, makeWriteRequest(1, 10)) + request := makeWriteRequest(10, 10) + request.Streams[0].Labels = `{buzz="f", x="y", a="b"}` + _, err := distributors[0].Push(ctx, request) require.NoError(t, err) topVal := ingester.Peek() - require.Len(t, topVal.Streams[0].Entries[0].Line, 5) + require.Equal(t, `{a="b", buzz="f", service_name="unknown_service", x="y"}`, topVal.Streams[0].Labels) }) } -func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) { +func Test_TruncateLogLines(t *testing.T) { setup := func() (*validation.Limits, *mockIngester) { limits := &validation.Limits{} flagext.DefaultValues(limits) limits.MaxLineSize = 5 + limits.MaxLineSizeTruncate = true return limits, &mockIngester{} } - t.Run("it discards invalid entries and discards resulting empty streams completely", func(t *testing.T) { + t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) { limits, ingester := setup() distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) _, err := distributors[0].Push(ctx, makeWriteRequest(1, 10)) - require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\"}", 10))) + require.NoError(t, err) topVal := ingester.Peek() - require.Nil(t, topVal) + require.Len(t, topVal.Streams[0].Entries[0].Line, 5) }) } @@ -838,9 +861,53 @@ func TestParseStreamLabels(t *testing.T) { expectedErr error generateLimits func() *validation.Limits }{ + { + name: "service name label mapping disabled", + generateLimits: func() *validation.Limits { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.DiscoverServiceName = nil + return limits + }, + origLabels: `{foo="bar"}`, + expectedLabels: labels.Labels{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + { + name: "no labels defined - service name label mapping disabled", + generateLimits: func() *validation.Limits { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.DiscoverServiceName = nil + return limits + }, + origLabels: `{}`, + expectedErr: fmt.Errorf(validation.MissingLabelsErrorMsg), + }, + { + name: "service name label enabled", + origLabels: `{foo="bar"}`, + generateLimits: func() *validation.Limits { + return defaultLimit + }, + expectedLabels: labels.Labels{ + { + Name: "foo", + Value: "bar", + }, + { + Name: loghttp_push.LabelServiceName, + Value: loghttp_push.ServiceUnknown, + }, + }, + }, { name: "service name label should not get counted against max labels count", - origLabels: `{foo="bar", service_name="unknown_service"}`, + origLabels: `{foo="bar"}`, generateLimits: func() *validation.Limits { limits := &validation.Limits{} flagext.DefaultValues(limits) @@ -858,6 +925,31 @@ func TestParseStreamLabels(t *testing.T) { }, }, }, + { + name: "use label service as service name", + origLabels: `{container="nginx", foo="bar", service="auth"}`, + generateLimits: func() *validation.Limits { + return defaultLimit + }, + expectedLabels: labels.Labels{ + { + Name: "container", + Value: "nginx", + }, + { + Name: "foo", + Value: "bar", + }, + { + Name: "service", + Value: "auth", + }, + { + Name: loghttp_push.LabelServiceName, + Value: "auth", + }, + }, + }, } { limits := tc.generateLimits() distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil) diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index b4f730a58a7fa..3035ccaf266fc 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -158,13 +158,17 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea return fmt.Errorf(validation.MissingLabelsErrorMsg) } + // Skip validation for aggregated metric streams, as we create those for internal use + if ls.Has(push.AggregatedMetricLabel) { + return nil + } + numLabelNames := len(ls) // This is a special case that's often added by the Loki infrastructure. It may result in allowing one extra label // if incoming requests already have a service_name if ls.Has(push.LabelServiceName) { numLabelNames-- } - if numLabelNames > ctx.maxLabelNamesPerSeries { updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream) return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries) diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index a9b174952f286..1f838630d2622 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -10,8 +10,6 @@ import ( "net/http" "time" - "github.com/grafana/loki/v3/pkg/logql/syntax" - "github.com/go-kit/log/level" "github.com/grafana/loki/pkg/push" @@ -27,6 +25,7 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/loghttp" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/unmarshal" @@ -59,9 +58,10 @@ var ( ) const ( - applicationJSON = "application/json" - LabelServiceName = "service_name" - ServiceUnknown = "unknown_service" + applicationJSON = "application/json" + LabelServiceName = "service_name" + ServiceUnknown = "unknown_service" + AggregatedMetricLabel = "__aggregated_metric__" ) type TenantsRetention interface { @@ -70,7 +70,6 @@ type TenantsRetention interface { type Limits interface { OTLPConfig(userID string) OTLPConfig - DiscoverServiceName(userID string) []string } type EmptyLimits struct{} @@ -79,12 +78,10 @@ func (EmptyLimits) OTLPConfig(string) OTLPConfig { return DefaultOTLPConfig(GlobalOTLPConfig{}) } -func (EmptyLimits) DiscoverServiceName(string) []string { - return nil -} - -type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) -type RequestParserWrapper func(inner RequestParser) RequestParser +type ( + RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) + RequestParserWrapper func(inner RequestParser) RequestParser +) type Stats struct { Errs []error @@ -100,6 +97,8 @@ type Stats struct { BodySize int64 // Extra is a place for a wrapped perser to record any interesting stats as key-value pairs to be logged Extra []any + + IsAggregatedMetric bool } func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker) (*logproto.PushRequest, error) { @@ -112,21 +111,26 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete entriesSize int64 structuredMetadataSize int64 ) + for retentionPeriod, size := range pushStats.LogLinesBytes { retentionHours := RetentionPeriodToString(retentionPeriod) - bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) - bytesReceivedStats.Inc(size) + if !pushStats.IsAggregatedMetric { + bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) + bytesReceivedStats.Inc(size) + } entriesSize += size } for retentionPeriod, size := range pushStats.StructuredMetadataBytes { retentionHours := RetentionPeriodToString(retentionPeriod) - structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) - bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) - bytesReceivedStats.Inc(size) - structuredMetadataBytesReceivedStats.Inc(size) + if !pushStats.IsAggregatedMetric { + structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) + bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) + bytesReceivedStats.Inc(size) + structuredMetadataBytesReceivedStats.Inc(size) + } entriesSize += size structuredMetadataSize += size @@ -158,7 +162,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete return req, nil } -func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) { +func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, _ Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) { // Body var body io.Reader // bodySize should always reflect the compressed size of the request body @@ -227,31 +231,19 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe pushStats.ContentType = contentType pushStats.ContentEncoding = contentEncoding - discoverServiceName := limits.DiscoverServiceName(userID) - for i := range req.Streams { - s := req.Streams[i] + for _, s := range req.Streams { pushStats.StreamLabelsSize += int64(len(s.Labels)) - lbs, err := syntax.ParseLabels(s.Labels) - if err != nil { - return nil, nil, fmt.Errorf("couldn't parse labels: %w", err) - } - - if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 { - serviceName := ServiceUnknown - for _, labelName := range discoverServiceName { - if labelVal := lbs.Get(labelName); labelVal != "" { - serviceName = labelVal - break - } + var lbs labels.Labels + if tenantsRetention != nil || tracker != nil { + lbs, err = syntax.ParseLabels(s.Labels) + if err != nil { + return nil, nil, fmt.Errorf("couldn't parse labels: %w", err) } + } - lb := labels.NewBuilder(lbs) - lbs = lb.Set(LabelServiceName, serviceName).Labels() - s.Labels = lbs.String() - - // Remove the added label after it's added to the stream so it's not consumed by subsequent steps - lbs = lb.Del(LabelServiceName).Labels() + if lbs.Has(AggregatedMetricLabel) { + pushStats.IsAggregatedMetric = true } var retentionPeriod time.Duration @@ -276,8 +268,6 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe pushStats.MostRecentEntryTimestamp = e.Timestamp } } - - req.Streams[i] = s } return &req, pushStats, nil diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index e391f14c4da0b..ffbfc23acb8e0 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -703,8 +703,8 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(QuerySchedulerRing, t.initQuerySchedulerRing, modules.UserInvisibleModule) mm.RegisterModule(Analytics, t.initAnalytics) mm.RegisterModule(CacheGenerationLoader, t.initCacheGenerationLoader) - mm.RegisterModule(PatternIngester, t.initPatternIngester) mm.RegisterModule(PatternRingClient, t.initPatternRingClient, modules.UserInvisibleModule) + mm.RegisterModule(PatternIngester, t.initPatternIngester) mm.RegisterModule(Metastore, t.initMetastore) mm.RegisterModule(MetastoreClient, t.initMetastoreClient, modules.UserInvisibleModule) @@ -738,8 +738,8 @@ func (t *Loki) setupModuleManager() error { BloomPlanner: {Server, BloomStore, Analytics, Store}, BloomBuilder: {Server, BloomStore, Analytics, Store}, BloomStore: {IndexGatewayRing}, - PatternIngester: {Server, MemberlistKV, Analytics}, PatternRingClient: {Server, MemberlistKV, Analytics}, + PatternIngester: {Server, MemberlistKV, Analytics, PatternRingClient}, IngesterRF1RingClient: {Server, MemberlistKV, Analytics}, Metastore: {Server, MetastoreClient}, IngesterQuerier: {Ring}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 72161f5ebc3f4..7d74bffafb911 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -709,7 +709,13 @@ func (t *Loki) initPatternIngester() (_ services.Service, err error) { return nil, nil } t.Cfg.Pattern.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort - t.PatternIngester, err = pattern.New(t.Cfg.Pattern, t.Cfg.MetricsNamespace, prometheus.DefaultRegisterer, util_log.Logger) + t.PatternIngester, err = pattern.New( + t.Cfg.Pattern, + t.PatternRingClient, + t.Cfg.MetricsNamespace, + prometheus.DefaultRegisterer, + util_log.Logger, + ) if err != nil { return nil, err } diff --git a/pkg/pattern/aggregation/config.go b/pkg/pattern/aggregation/config.go new file mode 100644 index 0000000000000..345e9ae8b6206 --- /dev/null +++ b/pkg/pattern/aggregation/config.go @@ -0,0 +1,107 @@ +package aggregation + +import ( + "flag" + "time" + + "github.com/grafana/dskit/backoff" + "github.com/prometheus/common/config" +) + +type Config struct { + // TODO(twhitney): This needs to be a per-tenant config + Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."` + DownsamplePeriod time.Duration `yaml:"downsample_period"` + LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."` + WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."` + PushPeriod time.Duration `yaml:"push_period,omitempty" doc:"description=How long to wait in between pushes to Loki."` + HTTPClientConfig config.HTTPClientConfig `yaml:"http_client_config,omitempty" doc:"description=The HTTP client configuration for pushing metrics to Loki."` + UseTLS bool `yaml:"use_tls,omitempty" doc:"description=Whether to use TLS for pushing metrics to Loki."` + BasicAuth BasicAuth `yaml:"basic_auth,omitempty" doc:"description=The basic auth configuration for pushing metrics to Loki."` + BackoffConfig backoff.Config `yaml:"backoff_config,omitempty" doc:"description=The backoff configuration for pushing metrics to Loki."` +} + +// RegisterFlags registers pattern ingester related flags. +func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix(fs, "") +} + +func (cfg *Config) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) { + fs.BoolVar( + &cfg.Enabled, + prefix+"metric-aggregation.enabled", + false, + "Flag to enable or disable metric aggregation.", + ) + fs.DurationVar( + &cfg.DownsamplePeriod, + prefix+"metric-aggregation.downsample-period", + 10*time.Second, + "How often to downsample metrics from raw push observations.", + ) + fs.StringVar( + &cfg.LokiAddr, + prefix+"metric-aggregation.loki-address", + "", + "Loki address to send aggregated metrics to.", + ) + fs.DurationVar( + &cfg.WriteTimeout, + prefix+"metric-aggregation.timeout", + 10*time.Second, + "How long to wait write response from Loki", + ) + fs.DurationVar( + &cfg.PushPeriod, + prefix+"metric-aggregation.push-period", + 1*time.Minute, + "How long to wait write response from Loki", + ) + fs.BoolVar( + &cfg.UseTLS, + prefix+"metric-aggregation.tls", + false, + "Does the loki connection use TLS?", + ) + + cfg.BackoffConfig.RegisterFlagsWithPrefix(prefix+"metric-aggregation", fs) + cfg.BasicAuth.RegisterFlagsWithPrefix(prefix+"metric-aggregation.", fs) +} + +// BasicAuth contains basic HTTP authentication credentials. +type BasicAuth struct { + Username string `yaml:"username" json:"username"` + // UsernameFile string `yaml:"username_file,omitempty" json:"username_file,omitempty"` + Password config.Secret `yaml:"password,omitempty" json:"password,omitempty"` + // PasswordFile string `yaml:"password_file,omitempty" json:"password_file,omitempty"` +} + +func (cfg *BasicAuth) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) { + fs.StringVar( + &cfg.Username, + prefix+"basic-auth.username", + "", + "Basic auth username for sending aggregations back to Loki.", + ) + fs.Var( + newSecretValue(config.Secret(""), &cfg.Password), + prefix+"basic-auth.password", + "Basic auth password for sending aggregations back to Loki.", + ) +} + +type secretValue string + +func newSecretValue(val config.Secret, p *config.Secret) *secretValue { + *p = val + return (*secretValue)(p) +} + +func (s *secretValue) Set(val string) error { + *s = secretValue(val) + return nil +} + +func (s *secretValue) Get() any { return string(*s) } + +func (s *secretValue) String() string { return string(*s) } diff --git a/pkg/pattern/aggregation/metrics.go b/pkg/pattern/aggregation/metrics.go new file mode 100644 index 0000000000000..d777af50b8130 --- /dev/null +++ b/pkg/pattern/aggregation/metrics.go @@ -0,0 +1,28 @@ +package aggregation + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type ChunkMetrics struct { + chunks *prometheus.GaugeVec + samples *prometheus.CounterVec +} + +func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMetrics { + return &ChunkMetrics{ + chunks: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "metric_chunks", + Help: "The total number of chunks in memory.", + }, []string{"service_name"}), + samples: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "metric_samples", + Help: "The total number of samples in memory.", + }, []string{"service_name"}), + } +} diff --git a/pkg/pattern/aggregation/push.go b/pkg/pattern/aggregation/push.go new file mode 100644 index 0000000000000..73bf087f48fdb --- /dev/null +++ b/pkg/pattern/aggregation/push.go @@ -0,0 +1,309 @@ +package aggregation + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/url" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/golang/snappy" + "github.com/prometheus/common/config" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/util/build" + + "github.com/grafana/dskit/backoff" + + "github.com/gogo/protobuf/proto" +) + +const ( + defaultContentType = "application/x-protobuf" + defaultMaxReponseBufferLen = 1024 + + pushEndpoint = "/loki/api/v1/push" +) + +var defaultUserAgent = fmt.Sprintf("pattern-ingester-push/%s", build.GetVersion().Version) + +type EntryWriter interface { + // WriteEntry handles sending the log to the output + // To maintain consistent log timing, Write is expected to be non-blocking + WriteEntry(ts time.Time, entry string, lbls labels.Labels) + Stop() +} + +// Push is a io.Writer, that writes given log entries by pushing +// directly to the given loki server URL. Each `Push` instance handles for a single tenant. +// No batching of log lines happens when sending to Loki. +type Push struct { + lokiURL string + tenantID string + httpClient *http.Client + userAgent string + contentType string + logger log.Logger + + // shutdown channels + quit chan struct{} + done chan struct{} + + // auth + username, password string + + // Will add these label to the logs pushed to loki + labelName, labelValue, streamName, streamValue string + + // push retry and backoff + backoff *backoff.Config + + entries entries +} + +type entry struct { + ts time.Time + entry string + labels labels.Labels +} + +type entries struct { + lock sync.RWMutex + entries []entry +} + +func (e *entries) add(entry entry) { + e.lock.Lock() + defer e.lock.Unlock() + e.entries = append(e.entries, entry) +} + +func (e *entries) reset() { + e.lock.Lock() + defer e.lock.Unlock() + e.entries = e.entries[:0] +} + +// NewPush creates an instance of `Push` which writes logs directly to given `lokiAddr` +func NewPush( + lokiAddr, tenantID string, + timeout time.Duration, + pushPeriod time.Duration, + cfg config.HTTPClientConfig, + username, password string, + useTLS bool, + backoffCfg *backoff.Config, + logger log.Logger, +) (*Push, error) { + client, err := config.NewClientFromConfig(cfg, "pattern-ingester-push", config.WithHTTP2Disabled()) + if err != nil { + return nil, err + } + + client.Timeout = timeout + scheme := "http" + + // setup tls transport + if useTLS { + scheme = "https" + } + + u := url.URL{ + Scheme: scheme, + Host: lokiAddr, + Path: pushEndpoint, + } + + p := &Push{ + lokiURL: u.String(), + tenantID: tenantID, + httpClient: client, + userAgent: defaultUserAgent, + contentType: defaultContentType, + username: username, + password: password, + logger: logger, + quit: make(chan struct{}), + done: make(chan struct{}), + backoff: backoffCfg, + entries: entries{ + entries: make([]entry, 0), + }, + } + + go p.run(pushPeriod) + return p, nil +} + +// WriteEntry implements EntryWriter +func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels) { + p.entries.add(entry{ts: ts, entry: e, labels: lbls}) +} + +// Stop will cancel any ongoing requests and stop the goroutine listening for requests +func (p *Push) Stop() { + if p.quit != nil { + close(p.quit) + <-p.done + p.quit = nil + } +} + +// buildPayload creates the snappy compressed protobuf to send to Loki +func (p *Push) buildPayload() ([]byte, error) { + p.entries.lock.RLock() + defer p.entries.lock.RUnlock() + + entriesByStream := make(map[string][]logproto.Entry) + for _, e := range p.entries.entries { + stream := e.labels.String() + entries, ok := entriesByStream[stream] + if !ok { + entries = make([]logproto.Entry, 0) + } + + entries = append(entries, logproto.Entry{ + Timestamp: e.ts, + Line: e.entry, + }) + entriesByStream[stream] = entries + } + + streams := make([]logproto.Stream, 0, len(entriesByStream)) + for s, entries := range entriesByStream { + lbls, err := syntax.ParseLabels(s) + if err != nil { + continue + } + + streams = append(streams, logproto.Stream{ + Labels: s, + Entries: entries, + Hash: lbls.Hash(), + }) + } + + req := &logproto.PushRequest{ + Streams: streams, + } + payload, err := proto.Marshal(req) + if err != nil { + return []byte{}, fmt.Errorf("failed to marshal payload to json: %w", err) + } + + payload = snappy.Encode(nil, payload) + + return payload, nil +} + +// run pulls lines out of the channel and sends them to Loki +func (p *Push) run(pushPeriod time.Duration) { + ctx, cancel := context.WithCancel(context.Background()) + pushTicker := time.NewTimer(pushPeriod) + defer pushTicker.Stop() + + defer func() { + pushTicker.Stop() + close(p.done) + }() + + for { + select { + case <-p.quit: + cancel() + return + case <-pushTicker.C: + payload, err := p.buildPayload() + if err != nil { + level.Error(p.logger).Log("msg", "failed to build payload", "err", err) + continue + } + + // We will use a timeout within each attempt to send + backoff := backoff.New(context.Background(), *p.backoff) + + // send log with retry + for { + status := 0 + status, err = p.send(ctx, payload) + if err == nil { + // reset entries on successful push + p.entries.reset() + pushTicker.Reset(pushPeriod) + break + } + + if status > 0 && status != 429 && status/100 != 5 { + level.Error(p.logger).Log("msg", "failed to send entry, server rejected push with a non-retryable status code", "status", status, "err", err) + pushTicker.Reset(pushPeriod) + break + } + + if !backoff.Ongoing() { + level.Error(p.logger).Log("msg", "failed to send entry, retries exhausted, entry will be dropped", "entry", "status", status, "error", err) + pushTicker.Reset(pushPeriod) + break + } + level.Warn(p.logger). + Log("msg", "failed to send entry, retrying", "entry", "status", status, "error", err) + backoff.Wait() + } + + } + } +} + +// send makes one attempt to send the payload to Loki +func (p *Push) send(ctx context.Context, payload []byte) (int, error) { + var ( + err error + resp *http.Response + ) + // Set a timeout for the request + ctx, cancel := context.WithTimeout(ctx, p.httpClient.Timeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", p.lokiURL, bytes.NewReader(payload)) + if err != nil { + return -1, fmt.Errorf("failed to create push request: %w", err) + } + req.Header.Set("Content-Type", p.contentType) + req.Header.Set("User-Agent", p.userAgent) + + // set org-id + if p.tenantID != "" { + req.Header.Set("X-Scope-OrgID", p.tenantID) + } + + // basic auth if provided + if p.username != "" { + req.SetBasicAuth(p.username, p.password) + } + + resp, err = p.httpClient.Do(req) + if err != nil { + return -1, fmt.Errorf("failed to push payload: %w", err) + } + status := resp.StatusCode + if status/100 != 2 { + scanner := bufio.NewScanner(io.LimitReader(resp.Body, defaultMaxReponseBufferLen)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, status, line) + } + + if err := resp.Body.Close(); err != nil { + level.Error(p.logger).Log("msg", "failed to close response body", "error", err) + } + + return status, err +} diff --git a/pkg/pattern/aggregation/push_test.go b/pkg/pattern/aggregation/push_test.go new file mode 100644 index 0000000000000..87685c914bffe --- /dev/null +++ b/pkg/pattern/aggregation/push_test.go @@ -0,0 +1,336 @@ +package aggregation + +import ( + "encoding/base64" + "fmt" + "math" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/util" +) + +const ( + testTenant = "test1" + testUsername = "user" + testPassword = "secret" + LogEntry = "%s %s\n" +) + +func Test_Push(t *testing.T) { + lbls := labels.New(labels.Label{Name: "test", Value: "test"}) + + // create dummy loki server + responses := make(chan response, 1) // buffered not to block the response handler + backoff := backoff.Config{ + MinBackoff: 300 * time.Millisecond, + MaxBackoff: 1 * time.Minute, + MaxRetries: 1, + } + + t.Run("sends log entry to loki server without TLS", func(t *testing.T) { + // mock loki server + mock := httptest.NewServer(createServerHandler(responses)) + require.NotNil(t, mock) + defer mock.Close() + + // without TLS + push, err := NewPush( + mock.Listener.Addr().String(), + "test1", + 2*time.Second, + 1*time.Second, + config.DefaultHTTPClientConfig, + "", "", + false, + &backoff, + log.NewNopLogger(), + ) + require.NoError(t, err) + ts, payload := testPayload() + push.WriteEntry(ts, payload, lbls) + resp := <-responses + assertResponse(t, resp, false, labelSet("test", "test"), ts, payload) + }) + + t.Run("sends log entry to loki server with basic auth", func(t *testing.T) { + // mock loki server + mock := httptest.NewServer(createServerHandler(responses)) + require.NotNil(t, mock) + defer mock.Close() + + // with basic Auth + push, err := NewPush( + mock.Listener.Addr().String(), + "test1", + 2*time.Second, + 1*time.Second, + config.DefaultHTTPClientConfig, + "user", "secret", + false, + &backoff, + log.NewNopLogger(), + ) + require.NoError(t, err) + ts, payload := testPayload() + push.WriteEntry(ts, payload, lbls) + resp := <-responses + assertResponse(t, resp, true, labelSet("test", "test"), ts, payload) + }) + + t.Run("batches push requests", func(t *testing.T) { + // mock loki server + mock := httptest.NewServer(createServerHandler(responses)) + require.NotNil(t, mock) + defer mock.Close() + + client, err := config.NewClientFromConfig( + config.DefaultHTTPClientConfig, + "pattern-ingester-push-test", + config.WithHTTP2Disabled(), + ) + require.NoError(t, err) + client.Timeout = 2 * time.Second + + u := url.URL{ + Scheme: "http", + Host: mock.Listener.Addr().String(), + Path: pushEndpoint, + } + + p := &Push{ + lokiURL: u.String(), + tenantID: "test1", + httpClient: client, + userAgent: defaultUserAgent, + contentType: defaultContentType, + username: "user", + password: "secret", + logger: log.NewNopLogger(), + quit: make(chan struct{}), + done: make(chan struct{}), + backoff: &backoff, + entries: entries{}, + } + + lbls1 := labels.New(labels.Label{Name: "test", Value: "test"}) + lbls2 := labels.New( + labels.Label{Name: "test", Value: "test"}, + labels.Label{Name: "test2", Value: "test2"}, + ) + + now := time.Now().Truncate(time.Second).UTC() + then := now.Add(-1 * time.Minute) + wayBack := now.Add(-5 * time.Minute) + + p.WriteEntry( + wayBack, + AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test_service", lbls1), + lbls1, + ) + p.WriteEntry( + then, + AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test_service", lbls1), + lbls1, + ) + p.WriteEntry( + now, + AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test_service", lbls1), + lbls1, + ) + + p.WriteEntry( + wayBack, + AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test2_service", lbls2), + lbls2, + ) + p.WriteEntry( + then, + AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test2_service", lbls2), + lbls2, + ) + p.WriteEntry( + now, + AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test2_service", lbls2), + lbls2, + ) + + go p.run(time.Nanosecond) + + select { + case resp := <-responses: + p.Stop() + req := resp.pushReq + assert.Len(t, req.Streams, 2) + + var stream1, stream2 logproto.Stream + for _, stream := range req.Streams { + if stream.Labels == lbls1.String() { + stream1 = stream + } + + if stream.Labels == lbls2.String() { + stream2 = stream + } + } + + require.Len(t, stream1.Entries, 3) + require.Len(t, stream2.Entries, 3) + + require.Equal(t, stream1.Entries[0].Timestamp, wayBack) + require.Equal(t, stream1.Entries[1].Timestamp, then) + require.Equal(t, stream1.Entries[2].Timestamp, now) + + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test_service", lbls1), + stream1.Entries[0].Line, + ) + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test_service", lbls1), + stream1.Entries[1].Line, + ) + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test_service", lbls1), + stream1.Entries[2].Line, + ) + + require.Equal(t, stream2.Entries[0].Timestamp, wayBack) + require.Equal(t, stream2.Entries[1].Timestamp, then) + require.Equal(t, stream2.Entries[2].Timestamp, now) + + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test2_service", lbls2), + stream2.Entries[0].Line, + ) + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test2_service", lbls2), + stream2.Entries[1].Line, + ) + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test2_service", lbls2), + stream2.Entries[2].Line, + ) + + case <-time.After(5 * time.Second): + t.Fatal("timeout") + } + }) +} + +// Test helpers + +func assertResponse(t *testing.T, resp response, testAuth bool, labels labels.Labels, ts time.Time, payload string) { + t.Helper() + + // assert metadata + assert.Equal(t, testTenant, resp.tenantID) + + var expUser, expPass string + + if testAuth { + expUser = testUsername + expPass = testPassword + } + + assert.Equal(t, expUser, resp.username) + assert.Equal(t, expPass, resp.password) + assert.Equal(t, defaultContentType, resp.contentType) + assert.Equal(t, defaultUserAgent, resp.userAgent) + + // assert stream labels + require.Len(t, resp.pushReq.Streams, 1) + assert.Equal(t, labels.String(), resp.pushReq.Streams[0].Labels) + assert.Equal(t, labels.Hash(), resp.pushReq.Streams[0].Hash) + + // assert log entry + require.Len(t, resp.pushReq.Streams, 1) + require.Len(t, resp.pushReq.Streams[0].Entries, 1) + assert.Equal(t, payload, resp.pushReq.Streams[0].Entries[0].Line) + assert.Equal(t, ts, resp.pushReq.Streams[0].Entries[0].Timestamp) +} + +type response struct { + tenantID string + pushReq logproto.PushRequest + contentType string + userAgent string + username, password string +} + +func createServerHandler(responses chan response) http.HandlerFunc { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + // Parse the request + var pushReq logproto.PushRequest + if err := util.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, util.RawSnappy); err != nil { + rw.WriteHeader(500) + return + } + + var username, password string + + basicAuth := req.Header.Get("Authorization") + if basicAuth != "" { + encoded := strings.TrimPrefix(basicAuth, "Basic ") // now we have just encoded `username:password` + decoded, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + rw.WriteHeader(500) + return + } + toks := strings.FieldsFunc(string(decoded), func(r rune) bool { + return r == ':' + }) + username, password = toks[0], toks[1] + } + + responses <- response{ + tenantID: req.Header.Get("X-Scope-OrgID"), + contentType: req.Header.Get("Content-Type"), + userAgent: req.Header.Get("User-Agent"), + username: username, + password: password, + pushReq: pushReq, + } + + rw.WriteHeader(http.StatusOK) + }) +} + +func labelSet(keyVals ...string) labels.Labels { + if len(keyVals)%2 != 0 { + panic("not matching key-value pairs") + } + + lbls := labels.Labels{} + + for i := 0; i < len(keyVals)-1; i += 2 { + lbls = append(lbls, labels.Label{Name: keyVals[i], Value: keyVals[i+1]}) + } + + return lbls +} + +func testPayload() (time.Time, string) { + ts := time.Now().UTC() + payload := fmt.Sprintf(LogEntry, fmt.Sprint(ts.UnixNano()), "pppppp") + + return ts, payload +} diff --git a/pkg/pattern/flush.go b/pkg/pattern/flush.go index d53b486a168c5..196e0da031ae3 100644 --- a/pkg/pattern/flush.go +++ b/pkg/pattern/flush.go @@ -73,3 +73,22 @@ func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) { return true, nil }) } + +func (i *Ingester) stopWriters() { + instances := i.getInstances() + + for _, instance := range instances { + if instance.writer != nil { + instance.writer.Stop() + } + } +} + + +func (i *Ingester) downsampleMetrics(ts model.Time) { + instances := i.getInstances() + + for _, instance := range instances { + instance.Downsample(ts) + } +} diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 8864a03960bc1..9aaf49a4b9ca8 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -16,11 +16,13 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "google.golang.org/grpc/health/grpc_health_v1" ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/pattern/aggregation" "github.com/grafana/loki/v3/pkg/pattern/clientpool" "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" @@ -38,6 +40,7 @@ type Config struct { FlushCheckPeriod time.Duration `yaml:"flush_check_period"` MaxClusters int `yaml:"max_clusters,omitempty" doc:"description=The maximum number of detected pattern clusters that can be created by streams."` MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."` + MetricAggregation aggregation.Config `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."` // For testing. factory ring_client.PoolFactory `yaml:"-"` @@ -47,6 +50,7 @@ type Config struct { func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlagsWithPrefix("pattern-ingester.", fs, util_log.Logger) cfg.ClientConfig.RegisterFlags(fs) + cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.") fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.") fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 1*time.Minute, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") @@ -64,6 +68,7 @@ func (cfg *Config) Validate() error { type Ingester struct { services.Service lifecycler *ring.Lifecycler + ringClient *RingClient lifecyclerWatcher *services.FailureWatcher @@ -87,6 +92,7 @@ type Ingester struct { func New( cfg Config, + ringClient *RingClient, metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, @@ -100,6 +106,7 @@ func New( i := &Ingester{ cfg: cfg, + ringClient: ringClient, logger: log.With(logger, "component", "pattern-ingester"), registerer: registerer, metrics: metrics, @@ -165,6 +172,7 @@ func (i *Ingester) stopping(_ error) error { flushQueue.Close() } i.flushQueuesDone.Wait() + i.stopWriters() return err } @@ -196,13 +204,29 @@ func (i *Ingester) loop() { flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j) defer flushTicker.Stop() - for { - select { - case <-flushTicker.C: - i.sweepUsers(false, true) - - case <-i.loopQuit: - return +if i.cfg.MetricAggregation.Enabled { + downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod) + defer downsampleTicker.Stop() + for { + select { + case <-flushTicker.C: + i.sweepUsers(false, true) + case t := <-downsampleTicker.C: + downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod) + now := model.TimeFromUnixNano(t.UnixNano()) + i.downsampleMetrics(now) + case <-i.loopQuit: + return + } + } + } else { + for { + select { + case <-flushTicker.C: + i.sweepUsers(false, true) + case <-i.loopQuit: + return + } } } } @@ -284,11 +308,34 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / inst, ok = i.instances[instanceID] if !ok { var err error + var writer aggregation.EntryWriter + + aggCfg := i.cfg.MetricAggregation + if aggCfg.Enabled { + writer, err = aggregation.NewPush( + aggCfg.LokiAddr, + instanceID, + aggCfg.WriteTimeout, + aggCfg.PushPeriod, + aggCfg.HTTPClientConfig, + aggCfg.BasicAuth.Username, + string(aggCfg.BasicAuth.Password), + aggCfg.UseTLS, + &aggCfg.BackoffConfig, + i.logger, + ) + if err != nil { + return nil, err + } + } inst, err = newInstance( instanceID, i.logger, i.metrics, i.drainCfg, + i.ringClient, + i.lifecycler.ID, + writer, ) if err != nil { return nil, err diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index e19ba040ff71e..64c3576d0cb38 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -2,22 +2,30 @@ package pattern import ( "context" + "errors" "fmt" "net/http" + "sync" + "github.com/dustin/go-humanize" "github.com/go-kit/log" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/multierror" + "github.com/grafana/dskit/ring" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/detection" "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/index" + "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/pattern/aggregation" "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/v3/pkg/util" + lokiring "github.com/grafana/loki/v3/pkg/util/ring" ) const indexShards = 32 @@ -32,21 +40,37 @@ type instance struct { logger log.Logger metrics *ingesterMetrics drainCfg *drain.Config + ringClient *RingClient + ingesterID string + + aggMetricsLock sync.Mutex + aggMetricsByStream map[string]aggregatedMetrics + + writer aggregation.EntryWriter +} + +type aggregatedMetrics struct { + bytes uint64 + count uint64 } -func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics, drainCfg *drain.Config) (*instance, error) { +func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics, drainCfg *drain.Config, ringClient *RingClient, ingesterID string, writer aggregation.EntryWriter) (*instance, error) { index, err := index.NewBitPrefixWithShards(indexShards) if err != nil { return nil, err } i := &instance{ - buf: make([]byte, 0, 1024), - logger: logger, - instanceID: instanceID, - streams: newStreamsMap(), - index: index, - metrics: metrics, - drainCfg: drainCfg, + buf: make([]byte, 0, 1024), + logger: logger, + instanceID: instanceID, + streams: newStreamsMap(), + index: index, + metrics: metrics, + drainCfg: drainCfg, + ringClient: ringClient, + ingesterID: ingesterID, + aggMetricsByStream: make(map[string]aggregatedMetrics), + writer: writer, } i.mapper = ingester.NewFPMapper(i.getLabelsFromFingerprint) return i, nil @@ -58,27 +82,68 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { appendErr := multierror.New() for _, reqStream := range req.Streams { - if reqStream.Entries == nil || len(reqStream.Entries) == 0 { - continue - } - s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels, - func() (*stream, error) { - // add stream - return i.createStream(ctx, reqStream) - }, nil) + // All streams are observed for metrics + i.Observe(reqStream.Labels, reqStream.Entries) + + // But only owned streamd are processed for patterns + ownedStream, err := i.isOwnedStream(i.ingesterID, reqStream.Labels) if err != nil { appendErr.Add(err) - continue } - err = s.Push(ctx, reqStream.Entries) - if err != nil { - appendErr.Add(err) - continue + + if ownedStream { + if reqStream.Entries == nil || len(reqStream.Entries) == 0 { + continue + } + s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels, + func() (*stream, error) { + // add stream + return i.createStream(ctx, reqStream) + }, nil) + if err != nil { + appendErr.Add(err) + continue + } + err = s.Push(ctx, reqStream.Entries) + if err != nil { + appendErr.Add(err) + continue + } } } + return appendErr.Err() } +func (i *instance) isOwnedStream(ingesterID string, stream string) (bool, error) { + var descs [1]ring.InstanceDesc + replicationSet, err := i.ringClient.ring.Get( + lokiring.TokenFor(i.instanceID, stream), + ring.WriteNoExtend, + descs[:0], + nil, + nil, + ) + if err != nil { + return false, fmt.Errorf( + "error getting replication set for stream %s: %v", + stream, + err, + ) + } + + if replicationSet.Instances == nil { + return false, errors.New("no instances found") + } + + for _, instanceDesc := range replicationSet.Instances { + if instanceDesc.Id == ingesterID { + return true, nil + } + } + return false, nil +} + // Iterator returns an iterator of pattern samples matching the given query patterns request. func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequest) (iter.Iterator, error) { matchers, err := syntax.ParseMatchers(req.Query, true) @@ -174,3 +239,95 @@ func (i *instance) removeStream(s *stream) { i.index.Delete(s.labels, s.fp) } } + +func (i *instance) Observe(stream string, entries []logproto.Entry) { + i.aggMetricsLock.Lock() + defer i.aggMetricsLock.Unlock() + + metrics, ok := i.aggMetricsByStream[stream] + if !ok { + metrics = aggregatedMetrics{} + } + + bytes := uint64(0) + count := uint64(len(entries)) + for _, entry := range entries { + bytes += uint64(len(entry.Line)) + } + + metrics.bytes += bytes + metrics.count += count + + i.aggMetricsByStream[stream] = metrics +} + +func (i *instance) Downsample(now model.Time) { + i.aggMetricsLock.Lock() + defer func() { + i.aggMetricsByStream = make(map[string]aggregatedMetrics) + i.aggMetricsLock.Unlock() + }() + + for stream, metrics := range i.aggMetricsByStream { + // TODO(twhitney) + // c.metrics.samples.Inc() + + lbls, err := syntax.ParseLabels(stream) + if err != nil { + continue + } + + i.writeAggregatedMetrics(now, lbls, metrics.bytes, metrics.count) + } +} + +func (i *instance) writeAggregatedMetrics( + now model.Time, + streamLbls labels.Labels, + totalBytes, totalCount uint64, +) { + service := streamLbls.Get(push.LabelServiceName) + if service == "" { + service = push.ServiceUnknown + } + + level := streamLbls.Get("level") + if level == "" { + level = detection.LogLevelUnknown + } + + newLbls := labels.Labels{ + labels.Label{Name: push.AggregatedMetricLabel, Value: service}, + labels.Label{Name: "level", Value: level}, + } + + if i.writer != nil { + i.writer.WriteEntry( + now.Time(), + AggregatedMetricEntry(now, totalBytes, totalCount, service, streamLbls), + newLbls, + ) + } +} + +func AggregatedMetricEntry( + ts model.Time, + totalBytes, totalCount uint64, + service string, + lbls labels.Labels, +) string { + byteString := humanize.Bytes(totalBytes) + base := fmt.Sprintf( + "ts=%d bytes=%s count=%d %s=%s", + ts.UnixNano(), + byteString, + totalCount, + push.LabelServiceName, service, + ) + + for _, l := range lbls { + base += fmt.Sprintf(" %s=%s", l.Name, l.Value) + } + + return base +} diff --git a/pkg/pattern/tee.go b/pkg/pattern/tee.go index 70fb37e1b6929..dd8f365405820 100644 --- a/pkg/pattern/tee.go +++ b/pkg/pattern/tee.go @@ -20,7 +20,8 @@ type Tee struct { logger log.Logger ringClient *RingClient - ingesterAppends *prometheus.CounterVec + ingesterAppends *prometheus.CounterVec + fallbackIngesterAppends *prometheus.CounterVec } func NewTee( @@ -36,7 +37,11 @@ func NewTee( logger: log.With(logger, "component", "pattern-tee"), ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "pattern_ingester_appends_total", - Help: "The total number of batch appends sent to pattern ingesters.", + Help: "The total number of batch appends of owned streams sent to pattern ingesters.", + }, []string{"ingester", "status"}), + fallbackIngesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "pattern_ingester_fallback_appends_total", + Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.", }, []string{"ingester", "status"}), cfg: cfg, ringClient: ringClient, @@ -57,6 +62,50 @@ func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { } func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { + err := t.sendOwnedStream(tenant, stream) + if err == nil { + return nil + } + + // Pattern ingesters serve 2 functions, processing patterns and aggregating metrics. + // Only owned streams are processed for patterns, however any pattern ingester can + // aggregate metrics for any stream. Therefore, if we can't send the owned stream, + // try to send it to any pattern ingester so we at least capture the metrics. + replicationSet, err := t.ringClient.ring.GetAllHealthy(ring.Read) + if replicationSet.Instances == nil { + return errors.New("no instances found") + } + + for _, instance := range replicationSet.Instances { + addr := instance.Addr + client, err := t.ringClient.pool.GetClientFor(addr) + if err != nil { + req := &logproto.PushRequest{ + Streams: []logproto.Stream{ + stream.Stream, + }, + } + + ctx, cancel := context.WithTimeout( + user.InjectOrgID(context.Background(), tenant), + t.cfg.ClientConfig.RemoteTimeout, + ) + defer cancel() + _, err = client.(logproto.PatternClient).Push(ctx, req) + if err != nil { + t.fallbackIngesterAppends.WithLabelValues(addr, "fail").Inc() + continue + } + t.fallbackIngesterAppends.WithLabelValues(addr, "success").Inc() + // bail after any success to prevent sending more than one + return nil + } + } + + return err +} + +func (t *Tee) sendOwnedStream(tenant string, stream distributor.KeyedStream) error { var descs [1]ring.InstanceDesc replicationSet, err := t.ringClient.ring.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) if err != nil {