diff --git a/CHANGELOG.md b/CHANGELOG.md index 5716ade066..41096862fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ This document contains a historical list of changes between releases. Only changes that impact end-user behavior are listed; changes to documentation or internal API changes are not present. -v1.5.0-rc.0 +v1.5.0-rc.1 ----------------- ### Breaking changes @@ -25,7 +25,10 @@ v1.5.0-rc.0 ### Features +- Add support bundle generation via the API endpoint /-/support (@dehaansa) + - Add the function `path_join` to the stdlib. (@wildum) + - Add `pyroscope.receive_http` component to receive and forward Pyroscope profiles (@marcsanmi) - Add support to `loki.source.syslog` for the RFC3164 format ("BSD syslog"). (@sushain97) @@ -37,6 +40,8 @@ v1.5.0-rc.0 - (_Experimental_) Add a `prometheus.write.queue` component to add an alternative to `prometheus.remote_write` which allowing the writing of metrics to a prometheus endpoint. (@mattdurham) +- (_Experimental_) Add the `arrary.combine_maps` function to the stdlib. (@ptodev, @wildum) + ### Enhancements - The `mimir.rules.kubernetes` component now supports adding extra label matchers diff --git a/docs/sources/reference/cli/run.md b/docs/sources/reference/cli/run.md index f246bb894e..c154696260 100644 --- a/docs/sources/reference/cli/run.md +++ b/docs/sources/reference/cli/run.md @@ -42,6 +42,7 @@ The following flags are supported: * `--server.http.ui-path-prefix`: Base path where the UI is exposed (default `/`). * `--storage.path`: Base directory where components can store data (default `data-alloy/`). * `--disable-reporting`: Disable [data collection][] (default `false`). +* `--disable-support-bundle`: Disable [support bundle][] endpoint (default `false`). * `--cluster.enabled`: Start {{< param "PRODUCT_NAME" >}} in clustered mode (default `false`). * `--cluster.node-name`: The name to use for this node (defaults to the environment's hostname). * `--cluster.join-addresses`: Comma-separated list of addresses to join the cluster at (default `""`). Mutually exclusive with `--cluster.discover-peers`. @@ -178,6 +179,7 @@ Refer to [alloy convert][] for more details on how `extra-args` work. [go-discover]: https://github.com/hashicorp/go-discover [in-memory HTTP traffic]: ../../../get-started/component_controller/#in-memory-traffic [data collection]: ../../../data-collection/ +[support bundle]: ../../../troubleshoot/support_bundle [components]: ../../get-started/components/ [component controller]: ../../../get-started/component_controller/ [UI]: ../../../troubleshoot/debug/#clustering-page diff --git a/docs/sources/reference/stdlib/array.md b/docs/sources/reference/stdlib/array.md index 482cc60e65..b9fb947bdc 100644 --- a/docs/sources/reference/stdlib/array.md +++ b/docs/sources/reference/stdlib/array.md @@ -32,3 +32,54 @@ Elements within the list can be any type. > array.concat([[1, 2], [3, 4]], [[5, 6]]) [[1, 2], [3, 4], [5, 6]] ``` + +## array.combine_maps + +> **EXPERIMENTAL**: This is an [experimental][] feature. Experimental +> features are subject to frequent breaking changes, and may be removed with +> no equivalent replacement. The `stability.level` flag must be set to `experimental` +> to use the feature. + +The `array.combine_maps` function allows you to join two arrays of maps if certain keys have matching values in both maps. It's particularly useful when combining labels of targets coming from different `prometheus.discovery.*` or `prometheus.exporter.*` components. +It takes three arguments: + +* The first two arguments are a of type `list(map(string))`. The keys of the map are strings. + The value for each key could be of any Alloy type such as a `string`, `integer`, `map`, or a `capsule`. +* The third input is an `array` containing strings. The strings are the keys whose value has to match for maps to be combined. + +The maps that don't contain all the keys provided in the third argument will be discarded. When maps are combined and both contain the same keys, the last value from the second argument will be used. + +Pseudo function code: +``` +for every map in arg1: + for every map in arg2: + if the condition key matches in both: + merge maps and add to result +``` + +### Examples + +```alloy +> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}], [{"instance"="1.1.1.1", "cluster"="prod"}], ["instance"]) +[{"instance"="1.1.1.1", "team"="A", "cluster"="prod"}] + +// Second map overrides the team in the first map +> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}], [{"instance"="1.1.1.1", "team"="B"}], ["instance"]) +[{"instance"="1.1.1.1", "team"="B"}] + +// If multiple maps from the first argument match with multiple maps from the second argument, different combinations will be created. +> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}, {"instance"="1.1.1.1", "team"="B"}], [{"instance"="1.1.1.1", "cluster"="prod"}, {"instance"="1.1.1.1", "cluster"="ops"}], ["instance"]) +[{"instance"="1.1.1.1", "team"="A", "cluster"="prod"}, {"instance"="1.1.1.1", "team"="A", "cluster"="ops"}, {"instance"="1.1.1.1", "team"="B", "cluster"="prod"}, {"instance"="1.1.1.1", "team"="B", "cluster"="ops"}] +``` + +Examples using discovery and exporter components: +```alloy +> array.combine_maps(discovery.kubernetes.k8s_pods.targets, prometheus.exporter.postgres, ["instance"]) + +> array.combine_maps(prometheus.exporter.redis.default.targets, [{"instance"="1.1.1.1", "testLabelKey" = "testLabelVal"}], ["instance"]) +``` + +You can find more examples in the [tests][]. + +[tests]: https://github.com/grafana/alloy/blob/main/syntax/vm/vm_stdlib_test.go +[experimental]: https://grafana.com/docs/release-life-cycle/ \ No newline at end of file diff --git a/docs/sources/troubleshoot/support_bundle.md b/docs/sources/troubleshoot/support_bundle.md new file mode 100644 index 0000000000..2bb870bc5b --- /dev/null +++ b/docs/sources/troubleshoot/support_bundle.md @@ -0,0 +1,51 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/troubleshoot/support_bundle/ +description: Learn how to generate a support bundle +title: Generate a support bundle +menuTitle: Generate a support bundle +weight: 300 +--- + +Public preview + +# Generate a support bundle + +{{< docs/public-preview product="Generate support bundle" >}} + +The `/-/support?duration=N` endpoint returns a support bundle, a zip file that contains information +about a running {{< param "PRODUCT_NAME" >}} instance, and can be used as a baseline of information when trying +to debug an issue. + +This feature is not covered by our [backward-compatibility][backward-compatibility] guarantees. + +{{< admonition type="note" >}} +This endpoint is enabled by default, but may be disabled using the `--disable-support-bundle` runtime flag. +{{< /admonition >}} + +The duration parameter is optional, must be less than or equal to the +configured HTTP server write timeout, and if not provided, defaults to it. +The endpoint is only exposed to the {{< param "PRODUCT_NAME" >}} HTTP server listen address, which +defaults to `localhost:12345`. + +The support bundle contains all information in plain text, so you can +inspect it before sharing to verify that no sensitive information has leaked. + +In addition, you can inspect the [supportbundle implementation](https://github.com/grafana/alloy/tree/internal/service/http/supportbundle.go) +to verify the code used to generate these bundles. + +A support bundle contains the following data: +* `alloy-components.json` contains information about the [components][components] running on this {{< param "PRODUCT_NAME" >}} instance, generated by the +`/api/v0/web/components` endpoint. +* `alloy-logs.txt` contains the logs during the bundle generation. +* `alloy-metadata.yaml` contains the {{< param "PRODUCT_NAME" >}} build version and the installation's operating system, architecture, and uptime. +* `alloy-metrics.txt` contains a snapshot of the internal metrics for {{< param "PRODUCT_NAME" >}}. +* `alloy-peers.json` contains information about the identified cluster peers of this {{< param "PRODUCT_NAME" >}} instance, generated by the +`/api/v0/web/peers` endpoint. +* `alloy-runtime-flags.txt` contains the values of the runtime flags available in {{< param "PRODUCT_NAME" >}}. +* The `pprof/` directory contains Go runtime profiling data (CPU, heap, goroutine, mutex, block profiles) as exported by the pprof package. +Refer to the [profile][profile] documentation for more details on how to use this information. + +[profile]: ../profile +[components]: ../../get-started/components/ +[alloy-repo]: https://github.com/grafana/alloy/issues +[backward-compatibility]: ../../introduction/backward-compatibility \ No newline at end of file diff --git a/go.mod b/go.mod index 449008346f..b8af8ca0cb 100644 --- a/go.mod +++ b/go.mod @@ -842,6 +842,8 @@ require ( go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.7.0 // indirect ) +require github.com/mackerelio/go-osstat v0.2.5 + // NOTE: replace directives below must always be *temporary*. // // Adding a replace directive to change a module to a fork of a module will diff --git a/go.sum b/go.sum index f1c249de2a..424d0eb4c7 100644 --- a/go.sum +++ b/go.sum @@ -1702,6 +1702,8 @@ github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c h1:VtwQ41oftZwlMn github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE= github.com/lyft/protoc-gen-validate v0.0.0-20180911180927-64fcb82c878e/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= +github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= +github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/internal/alloycli/cmd_run.go b/internal/alloycli/cmd_run.go index 33f9798eb7..306a030c55 100644 --- a/internal/alloycli/cmd_run.go +++ b/internal/alloycli/cmd_run.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/ckit/peer" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/cobra" + "github.com/spf13/pflag" "go.opentelemetry.io/otel" "golang.org/x/exp/maps" @@ -64,6 +65,7 @@ func runCommand() *cobra.Command { clusterAdvInterfaces: advertise.DefaultInterfaces, clusterMaxJoinPeers: 5, clusterRejoinInterval: 60 * time.Second, + disableSupportBundle: false, } cmd := &cobra.Command{ @@ -100,7 +102,7 @@ depending on the nature of the reload error. SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - return r.Run(args[0]) + return r.Run(cmd, args[0]) }, } @@ -111,6 +113,8 @@ depending on the nature of the reload error. cmd.Flags().StringVar(&r.uiPrefix, "server.http.ui-path-prefix", r.uiPrefix, "Prefix to serve the HTTP UI at") cmd.Flags(). BoolVar(&r.enablePprof, "server.http.enable-pprof", r.enablePprof, "Enable /debug/pprof profiling endpoints.") + cmd.Flags(). + BoolVar(&r.disableSupportBundle, "server.http.disable-support-bundle", r.disableSupportBundle, "Disable /-/support support bundle retrieval.") // Cluster flags cmd.Flags(). @@ -184,9 +188,10 @@ type alloyRun struct { configBypassConversionErrors bool configExtraArgs string enableCommunityComps bool + disableSupportBundle bool } -func (fr *alloyRun) Run(configPath string) error { +func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error { var wg sync.WaitGroup defer wg.Wait() @@ -275,8 +280,15 @@ func (fr *alloyRun) Run(configPath string) error { return err } + runtimeFlags := []string{} + if !fr.disableSupportBundle { + cmd.Flags().VisitAll(func(f *pflag.Flag) { + runtimeFlags = append(runtimeFlags, fmt.Sprintf("%s=%s", f.Name, f.Value.String())) + }) + } + httpService := httpservice.New(httpservice.Options{ - Logger: log.With(l, "service", "http"), + Logger: l, Tracer: t, Gatherer: prometheus.DefaultGatherer, @@ -286,6 +298,11 @@ func (fr *alloyRun) Run(configPath string) error { HTTPListenAddr: fr.httpListenAddr, MemoryListenAddr: fr.inMemoryAddr, EnablePProf: fr.enablePprof, + MinStability: fr.minStability, + BundleContext: httpservice.SupportBundleContext{ + RuntimeFlags: runtimeFlags, + DisableSupportBundle: fr.disableSupportBundle, + }, }) remoteCfgService, err := remotecfgservice.New(remotecfgservice.Options{ diff --git a/internal/runtime/internal/controller/component_references.go b/internal/runtime/internal/controller/component_references.go index cc5205dfdc..5754878042 100644 --- a/internal/runtime/internal/controller/component_references.go +++ b/internal/runtime/internal/controller/component_references.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/internal/dag" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/syntax/ast" @@ -18,6 +19,17 @@ import ( // will be (field_a, field_b, field_c). type Traversal []*ast.Ident +// String returns a dot-separated string representation of the field names in the traversal. +// For example, a traversal of fields [field_a, field_b, field_c] returns "field_a.field_b.field_c". +// Returns an empty string if the traversal contains no fields. +func (t Traversal) String() string { + var fieldNames []string + for _, field := range t { + fieldNames = append(fieldNames, field.Name) + } + return strings.Join(fieldNames, ".") +} + // Reference describes an Alloy expression reference to a BlockNode. type Reference struct { Target BlockNode // BlockNode being referenced @@ -29,7 +41,7 @@ type Reference struct { // ComponentReferences returns the list of references a component is making to // other components. -func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope) ([]Reference, diag.Diagnostics) { +func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope, minStability featuregate.Stability) ([]Reference, diag.Diagnostics) { var ( traversals []Traversal @@ -63,6 +75,16 @@ func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scop refs = append(refs, ref) } else if scope.IsStdlibDeprecated(t[0].Name) { level.Warn(l).Log("msg", "this stdlib function is deprecated; please refer to the documentation for updated usage and alternatives", "function", t[0].Name) + } else if funcName := t.String(); scope.IsStdlibExperimental(funcName) { + if err := featuregate.CheckAllowed(featuregate.StabilityExperimental, minStability, funcName); err != nil { + diags = append(diags, diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: err.Error(), + StartPos: ast.StartPos(t[0]).Position(), + EndPos: ast.StartPos(t[len(t)-1]).Position(), + }) + continue + } } } diff --git a/internal/runtime/internal/controller/loader.go b/internal/runtime/internal/controller/loader.go index 8cbe0061fe..fae75f5865 100644 --- a/internal/runtime/internal/controller/loader.go +++ b/internal/runtime/internal/controller/loader.go @@ -615,7 +615,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { // Finally, wire component references. l.cache.mut.RLock() - refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope) + refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope, l.globals.MinStability) l.cache.mut.RUnlock() for _, ref := range refs { g.AddEdge(dag.Edge{From: n, To: ref.Target}) diff --git a/internal/runtime/logging/logger.go b/internal/runtime/logging/logger.go index 5bc2402772..4e1b5bd5a3 100644 --- a/internal/runtime/logging/logger.go +++ b/internal/runtime/logging/logger.go @@ -55,6 +55,12 @@ func New(w io.Writer, o Options) (*Logger, error) { return l, nil } +// NewNop returns a logger that does nothing +func NewNop() *Logger { + l, _ := NewDeferred(io.Discard) + return l +} + // NewDeferred creates a new logger with the default log level and format. // The logger is not updated during initialization. func NewDeferred(w io.Writer) (*Logger, error) { @@ -63,7 +69,6 @@ func NewDeferred(w io.Writer) (*Logger, error) { format formatVar writer writerVar ) - l := &Logger{ inner: w, @@ -104,11 +109,10 @@ func (l *Logger) Update(o Options) error { l.level.Set(slogLevel(o.Level).Level()) l.format.Set(o.Format) - newWriter := l.inner + l.writer.SetInnerWriter(l.inner) if len(o.WriteTo) > 0 { - newWriter = io.MultiWriter(l.inner, &lokiWriter{o.WriteTo}) + l.writer.SetLokiWriter(&lokiWriter{o.WriteTo}) } - l.writer.Set(newWriter) // Build all our deferred handlers if l.deferredSlog != nil { @@ -133,6 +137,14 @@ func (l *Logger) Update(o Options) error { return nil } +func (l *Logger) SetTemporaryWriter(w io.Writer) { + l.writer.SetTemporaryWriter(w) +} + +func (l *Logger) RemoveTemporaryWriter() { + l.writer.RemoveTemporaryWriter() +} + // Log implements log.Logger. func (l *Logger) Log(kvps ...interface{}) error { // Buffer logs before confirming log format is configured in `logging` block @@ -215,24 +227,63 @@ func (f *formatVar) Set(format Format) { type writerVar struct { mut sync.RWMutex - w io.Writer + + lokiWriter *lokiWriter + innerWriter io.Writer + tmpWriter io.Writer } -func (w *writerVar) Set(inner io.Writer) { +func (w *writerVar) SetTemporaryWriter(writer io.Writer) { w.mut.Lock() defer w.mut.Unlock() - w.w = inner + w.tmpWriter = writer } -func (w *writerVar) Write(p []byte) (n int, err error) { +func (w *writerVar) RemoveTemporaryWriter() { + w.mut.Lock() + defer w.mut.Unlock() + w.tmpWriter = nil +} + +func (w *writerVar) SetInnerWriter(writer io.Writer) { + w.mut.Lock() + defer w.mut.Unlock() + w.innerWriter = writer +} + +func (w *writerVar) SetLokiWriter(writer *lokiWriter) { + w.mut.Lock() + defer w.mut.Unlock() + w.lokiWriter = writer +} + +func (w *writerVar) Write(p []byte) (int, error) { w.mut.RLock() defer w.mut.RUnlock() - if w.w == nil { + if w.innerWriter == nil { return 0, fmt.Errorf("no writer available") } - return w.w.Write(p) + // The following is effectively an io.Multiwriter, but without updating + // the Multiwriter each time tmpWriter is added or removed. + if _, err := w.innerWriter.Write(p); err != nil { + return 0, err + } + + if w.lokiWriter != nil { + if _, err := w.lokiWriter.Write(p); err != nil { + return 0, err + } + } + + if w.tmpWriter != nil { + if _, err := w.tmpWriter.Write(p); err != nil { + return 0, err + } + } + + return len(p), nil } type bufferedItem struct { diff --git a/internal/service/http/http.go b/internal/service/http/http.go index ff070b330b..590802b9b4 100644 --- a/internal/service/http/http.go +++ b/internal/service/http/http.go @@ -2,6 +2,7 @@ package http import ( + "bytes" "context" "crypto/tls" "fmt" @@ -11,14 +12,17 @@ import ( "os" "path" "sort" + "strconv" "strings" "sync" + "time" "github.com/go-kit/log" "github.com/gorilla/mux" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/featuregate" alloy_runtime "github.com/grafana/alloy/internal/runtime" + "github.com/grafana/alloy/internal/runtime/logging" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/service" "github.com/grafana/alloy/internal/service/remotecfg" @@ -40,16 +44,18 @@ const ServiceName = "http" // Options are used to configure the HTTP service. Options are constant for the // lifetime of the HTTP service. type Options struct { - Logger log.Logger // Where to send logs. + Logger *logging.Logger // Where to send logs. Tracer trace.TracerProvider // Where to send traces. Gatherer prometheus.Gatherer // Where to collect metrics from. ReadyFunc func() bool ReloadFunc func() (*alloy_runtime.Source, error) - HTTPListenAddr string // Address to listen for HTTP traffic on. - MemoryListenAddr string // Address to accept in-memory traffic on. - EnablePProf bool // Whether pprof endpoints should be exposed. + HTTPListenAddr string // Address to listen for HTTP traffic on. + MemoryListenAddr string // Address to accept in-memory traffic on. + EnablePProf bool // Whether pprof endpoints should be exposed. + MinStability featuregate.Stability // Minimum stability level to utilize for feature gates + BundleContext SupportBundleContext // Context for delivering a support bundle } // Arguments holds runtime settings for the HTTP service. @@ -58,14 +64,20 @@ type Arguments struct { } type Service struct { - log log.Logger - tracer trace.TracerProvider - gatherer prometheus.Gatherer - opts Options + // globalLogger allows us to leverage the logging struct for setting a temporary + // logger for support bundle usage and still leverage log.With for logging in the service + globalLogger *logging.Logger + log log.Logger + tracer trace.TracerProvider + gatherer prometheus.Gatherer + opts Options winMut sync.Mutex win *server.WinCertStoreHandler + // Used to enforce single-flight requests to supportHandler + supportBundleMut sync.Mutex + // publicLis and tcpLis are used to lazily enable TLS, since TLS is // optionally configurable at runtime. // @@ -95,7 +107,7 @@ func New(opts Options) *Service { ) if l == nil { - l = log.NewNopLogger() + l = logging.NewNop() } if t == nil { t = noop.NewTracerProvider() @@ -113,10 +125,11 @@ func New(opts Options) *Service { _ = publicLis.SetInner(tcpLis) return &Service{ - log: l, - tracer: t, - gatherer: r, - opts: opts, + globalLogger: l, + log: log.With(l, "service", "http"), + tracer: t, + gatherer: r, + opts: opts, publicLis: publicLis, tcpLis: tcpLis, @@ -211,6 +224,9 @@ func (s *Service) Run(ctx context.Context, host service.Host) error { }).Methods(http.MethodGet, http.MethodPost) } + // Wire in support bundle generator + r.HandleFunc("/-/support", s.supportHandler).Methods("GET") + // Wire custom service handlers for services which depend on the http // service. // @@ -243,6 +259,70 @@ func (s *Service) Run(ctx context.Context, host service.Host) error { return nil } +func (s *Service) supportHandler(rw http.ResponseWriter, r *http.Request) { + s.supportBundleMut.Lock() + defer s.supportBundleMut.Unlock() + + // TODO(dehaansa) remove this check once the support bundle is generally available + if !s.opts.MinStability.Permits(featuregate.StabilityPublicPreview) { + rw.WriteHeader(http.StatusForbidden) + _, _ = rw.Write([]byte("support bundle generation is only available in public preview. Use" + + " --stability.level command-line flag to enable public-preview features")) + return + } + + if s.opts.BundleContext.DisableSupportBundle { + rw.WriteHeader(http.StatusForbidden) + _, _ = rw.Write([]byte("support bundle generation is disabled; it can be re-enabled by removing the --disable-support-bundle flag")) + return + } + + duration := getServerWriteTimeout(r) + if r.URL.Query().Has("duration") { + d, err := strconv.Atoi(r.URL.Query().Get("duration")) + if err != nil { + http.Error(rw, fmt.Sprintf("duration value (in seconds) should be a positive integer: %s", err), http.StatusBadRequest) + return + } + if d < 1 { + http.Error(rw, "duration value (in seconds) should be larger than 1", http.StatusBadRequest) + return + } + if float64(d) > duration.Seconds() { + http.Error(rw, "duration value exceeds the server's write timeout", http.StatusBadRequest) + return + } + duration = time.Duration(d) * time.Second + } + ctx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + + var logsBuffer bytes.Buffer + syncBuff := log.NewSyncWriter(&logsBuffer) + s.globalLogger.SetTemporaryWriter(syncBuff) + defer func() { + s.globalLogger.RemoveTemporaryWriter() + }() + + bundle, err := ExportSupportBundle(ctx, s.opts.BundleContext.RuntimeFlags, s.opts.HTTPListenAddr, s.Data().(Data).DialFunc) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + if err := ServeSupportBundle(rw, bundle, &logsBuffer); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } +} + +func getServerWriteTimeout(r *http.Request) time.Duration { + srv, ok := r.Context().Value(http.ServerContextKey).(*http.Server) + if ok && srv.WriteTimeout != 0 { + return srv.WriteTimeout + } + return 30 * time.Second +} + // getServiceRoutes returns a sorted list of service routes for services which // depend on the HTTP service. // diff --git a/internal/service/http/http_test.go b/internal/service/http/http_test.go index bfc212471d..2481fcd6cb 100644 --- a/internal/service/http/http_test.go +++ b/internal/service/http/http_test.go @@ -169,7 +169,7 @@ func newTestEnvironment(t *testing.T) (*testEnvironment, error) { } svc := New(Options{ - Logger: util.TestLogger(t), + Logger: util.TestAlloyLogger(t), Tracer: noop.NewTracerProvider(), Gatherer: prometheus.NewRegistry(), diff --git a/internal/service/http/supportbundle.go b/internal/service/http/supportbundle.go new file mode 100644 index 0000000000..3c75c35150 --- /dev/null +++ b/internal/service/http/supportbundle.go @@ -0,0 +1,211 @@ +package http + +import ( + "archive/zip" + "bytes" + "context" + "fmt" + "io" + "net/http" + "path/filepath" + "runtime" + "runtime/pprof" + "strings" + "time" + + "github.com/grafana/alloy/internal/build" + "github.com/grafana/alloy/internal/static/server" + "github.com/mackerelio/go-osstat/uptime" + "gopkg.in/yaml.v3" +) + +// SupportBundleContext groups the relevant context that is used in the HTTP +// service config for the support bundle +type SupportBundleContext struct { + DisableSupportBundle bool // Whether support bundle endpoint should be disabled. + RuntimeFlags []string // Alloy runtime flags to send with support bundle +} + +// Bundle collects all the data that is exposed as a support bundle. +type Bundle struct { + meta []byte + alloyMetrics []byte + components []byte + peers []byte + runtimeFlags []byte + heapBuf *bytes.Buffer + goroutineBuf *bytes.Buffer + blockBuf *bytes.Buffer + mutexBuf *bytes.Buffer + cpuBuf *bytes.Buffer +} + +// Metadata contains general runtime information about the current Alloy environment. +type Metadata struct { + BuildVersion string `yaml:"build_version"` + OS string `yaml:"os"` + Architecture string `yaml:"architecture"` + Uptime float64 `yaml:"uptime"` +} + +// ExportSupportBundle gathers the information required for the support bundle. +func ExportSupportBundle(ctx context.Context, runtimeFlags []string, srvAddress string, dialContext server.DialContextFunc) (*Bundle, error) { + // The block profiler is disabled by default. Temporarily enable recording + // of all blocking events. Also, temporarily record all mutex contentions, + // and defer restoring of earlier mutex profiling fraction. + runtime.SetBlockProfileRate(1) + old := runtime.SetMutexProfileFraction(1) + defer func() { + runtime.SetBlockProfileRate(0) + runtime.SetMutexProfileFraction(old) + }() + + // Gather runtime metadata. + ut, err := uptime.Get() + if err != nil { + return nil, err + } + m := Metadata{ + BuildVersion: build.Version, + OS: runtime.GOOS, + Architecture: runtime.GOARCH, + Uptime: ut.Seconds(), + } + meta, err := yaml.Marshal(m) + if err != nil { + return nil, fmt.Errorf("failed to marshal support bundle metadata: %s", err) + } + + var httpClient http.Client + httpClient.Transport = &http.Transport{DialContext: dialContext} + // Gather Alloy's own metrics. + alloyMetrics, err := retrieveAPIEndpoint(httpClient, srvAddress, "metrics") + if err != nil { + return nil, fmt.Errorf("failed to get internal Alloy metrics: %s", err) + } + // Gather running component configuration + components, err := retrieveAPIEndpoint(httpClient, srvAddress, "api/v0/web/components") + if err != nil { + return nil, fmt.Errorf("failed to get component details: %s", err) + } + // Gather cluster peers information + peers, err := retrieveAPIEndpoint(httpClient, srvAddress, "api/v0/web/peers") + if err != nil { + return nil, fmt.Errorf("failed to get peer details: %s", err) + } + + // Export pprof data. + var ( + cpuBuf bytes.Buffer + heapBuf bytes.Buffer + goroutineBuf bytes.Buffer + blockBuf bytes.Buffer + mutexBuf bytes.Buffer + ) + err = pprof.StartCPUProfile(&cpuBuf) + if err != nil { + return nil, err + } + deadline, _ := ctx.Deadline() + // Sleep for the remaining of the context deadline, but leave some time for + // the rest of the bundle to be exported successfully. + time.Sleep(time.Until(deadline) - 200*time.Millisecond) + pprof.StopCPUProfile() + + p := pprof.Lookup("heap") + if err := p.WriteTo(&heapBuf, 0); err != nil { + return nil, err + } + p = pprof.Lookup("goroutine") + if err := p.WriteTo(&goroutineBuf, 0); err != nil { + return nil, err + } + p = pprof.Lookup("block") + if err := p.WriteTo(&blockBuf, 0); err != nil { + return nil, err + } + p = pprof.Lookup("mutex") + if err := p.WriteTo(&mutexBuf, 0); err != nil { + return nil, err + } + + // Finally, bundle everything up to be served, either as a zip from + // memory, or exported to a directory. + bundle := &Bundle{ + meta: meta, + alloyMetrics: alloyMetrics, + components: components, + peers: peers, + runtimeFlags: []byte(strings.Join(runtimeFlags, "\n")), + heapBuf: &heapBuf, + goroutineBuf: &goroutineBuf, + blockBuf: &blockBuf, + mutexBuf: &mutexBuf, + cpuBuf: &cpuBuf, + } + + return bundle, nil +} + +func retrieveAPIEndpoint(httpClient http.Client, srvAddress, endpoint string) ([]byte, error) { + url := fmt.Sprintf("http://%s/%s", srvAddress, endpoint) + resp, err := httpClient.Get(url) + if err != nil { + return nil, err + } + res, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return res, nil +} + +// ServeSupportBundle the collected data and logs as a zip file over the given +// http.ResponseWriter. +func ServeSupportBundle(rw http.ResponseWriter, b *Bundle, logsBuf *bytes.Buffer) error { + zw := zip.NewWriter(rw) + rw.Header().Set("Content-Type", "application/zip") + rw.Header().Set("Content-Disposition", "attachment; filename=\"alloy-support-bundle.zip\"") + + zipStructure := map[string][]byte{ + "alloy-metadata.yaml": b.meta, + "alloy-components.json": b.components, + "alloy-peers.json": b.peers, + "alloy-metrics.txt": b.alloyMetrics, + "alloy-runtime-flags.txt": b.runtimeFlags, + "alloy-logs.txt": logsBuf.Bytes(), + "pprof/cpu.pprof": b.cpuBuf.Bytes(), + "pprof/heap.pprof": b.heapBuf.Bytes(), + "pprof/goroutine.pprof": b.goroutineBuf.Bytes(), + "pprof/mutex.pprof": b.mutexBuf.Bytes(), + "pprof/block.pprof": b.blockBuf.Bytes(), + } + + for fn, b := range zipStructure { + if b != nil { + path := append([]string{"alloy-support-bundle"}, strings.Split(fn, "/")...) + if err := writeByteSlice(zw, b, path...); err != nil { + return err + } + } + } + + err := zw.Close() + if err != nil { + return fmt.Errorf("failed to flush the zip writer: %v", err) + } + return nil +} + +func writeByteSlice(zw *zip.Writer, b []byte, fn ...string) error { + f, err := zw.Create(filepath.Join(fn...)) + if err != nil { + return err + } + _, err = f.Write(b) + if err != nil { + return err + } + return nil +} diff --git a/syntax/internal/stdlib/stdlib.go b/syntax/internal/stdlib/stdlib.go index 995e538d4b..85dadd9c67 100644 --- a/syntax/internal/stdlib/stdlib.go +++ b/syntax/internal/stdlib/stdlib.go @@ -17,7 +17,15 @@ import ( "gopkg.in/yaml.v3" ) -// There identifiers are deprecated in favour of the namespaced ones. +// TODO: refactor the stdlib to have consistent naming between namespaces and identifiers. + +// ExperimentalIdentifiers contains the full name (namespace + identifier's name) of stdlib +// identifiers that are considered "experimental". +var ExperimentalIdentifiers = map[string]bool{ + "array.combine_maps": true, +} + +// These identifiers are deprecated in favour of the namespaced ones. var DeprecatedIdentifiers = map[string]interface{}{ "env": os.Getenv, "nonsensitive": nonSensitive, @@ -86,7 +94,8 @@ var str = map[string]interface{}{ } var array = map[string]interface{}{ - "concat": concat, + "concat": concat, + "combine_maps": combineMaps, } var convert = map[string]interface{}{ @@ -146,6 +155,138 @@ var concat = value.RawFunction(func(funcValue value.Value, args ...value.Value) return value.Array(raw...), nil }) +// This function assumes that the types of the value.Value objects are correct. +func shouldJoin(left value.Value, right value.Value, conditions value.Value) (bool, error) { + for i := 0; i < conditions.Len(); i++ { + condition := conditions.Index(i).Text() + + leftVal, ok := left.Key(condition) + if !ok { + return false, nil + } + + rightVal, ok := right.Key(condition) + if !ok { + return false, nil + } + + if !leftVal.Equal(rightVal) { + return false, nil + } + } + return true, nil +} + +// Merge two maps. +// If a key exists in both maps, the value from the right map will be used. +func concatMaps(left, right value.Value) (value.Value, error) { + res := make(map[string]value.Value) + + for _, key := range left.Keys() { + val, ok := left.Key(key) + if !ok { + return value.Null, fmt.Errorf("concatMaps: key %s not found in left map while iterating - this should never happen", key) + } + res[key] = val + } + + for _, key := range right.Keys() { + val, ok := right.Key(key) + if !ok { + return value.Null, fmt.Errorf("concatMaps: key %s not found in right map while iterating - this should never happen", key) + } + res[key] = val + } + + return value.Object(res), nil +} + +// Inputs: +// args[0]: []map[string]string: lhs array +// args[1]: []map[string]string: rhs array +// args[2]: []string: merge conditions +var combineMaps = value.RawFunction(func(funcValue value.Value, args ...value.Value) (value.Value, error) { + if len(args) != 3 { + return value.Value{}, fmt.Errorf("combine_maps: expected 3 arguments, got %d", len(args)) + } + + // Validate args[0] and args[1] + for i := range []int{0, 1} { + if args[i].Type() != value.TypeArray { + return value.Null, value.ArgError{ + Function: funcValue, + Argument: args[i], + Index: i, + Inner: value.TypeError{ + Value: args[i], + Expected: value.TypeArray, + }, + } + } + for j := 0; j < args[i].Len(); j++ { + if args[i].Index(j).Type() != value.TypeObject { + return value.Null, value.ArgError{ + Function: funcValue, + Argument: args[i].Index(j), + Index: j, + Inner: value.TypeError{ + Value: args[i].Index(j), + Expected: value.TypeObject, + }, + } + } + } + } + + // Validate args[2] + if args[2].Type() != value.TypeArray { + return value.Null, value.ArgError{ + Function: funcValue, + Argument: args[2], + Index: 2, + Inner: value.TypeError{ + Value: args[2], + Expected: value.TypeArray, + }, + } + } + if args[2].Len() == 0 { + return value.Null, value.ArgError{ + Function: funcValue, + Argument: args[2], + Index: 2, + Inner: fmt.Errorf("combine_maps: merge conditions must not be empty"), + } + } + + // We cannot preallocate the size of the result array, because we don't know + // how well the merge is going to go. If none of the merge conditions are met, + // the result array will be empty. + res := []value.Value{} + + for i := 0; i < args[0].Len(); i++ { + for j := 0; j < args[1].Len(); j++ { + left := args[0].Index(i) + right := args[1].Index(j) + + join, err := shouldJoin(left, right, args[2]) + if err != nil { + return value.Null, err + } + + if join { + val, err := concatMaps(left, right) + if err != nil { + return value.Null, err + } + res = append(res, val) + } + } + } + + return value.Array(res...), nil +}) + func jsonDecode(in string) (interface{}, error) { var res interface{} err := json.Unmarshal([]byte(in), &res) diff --git a/syntax/internal/value/value.go b/syntax/internal/value/value.go index 3c8554b88c..829449370e 100644 --- a/syntax/internal/value/value.go +++ b/syntax/internal/value/value.go @@ -396,7 +396,7 @@ func (v Value) Key(key string) (index Value, ok bool) { // // An ArgError will be returned if one of the arguments is invalid. An Error // will be returned if the function call returns an error or if the number of -// arguments doesn't match. +// arguments doesn't match func (v Value) Call(args ...Value) (Value, error) { if v.ty != TypeFunction { panic("syntax/value: Call called on non-function type") @@ -553,3 +553,16 @@ func convertGoNumber(nval Number, target reflect.Type) reflect.Value { panic("unsupported number conversion") } + +// Equal will result in panic if the values are funcs, maps or slices +func (v Value) Equal(rhs Value) bool { + if v.Type() != rhs.Type() { + return false + } + + if !v.rv.Equal(rhs.rv) { + return false + } + + return true +} diff --git a/syntax/internal/value/value_test.go b/syntax/internal/value/value_test.go index fbebcabdd7..ec52f192c9 100644 --- a/syntax/internal/value/value_test.go +++ b/syntax/internal/value/value_test.go @@ -140,6 +140,12 @@ func TestValue_Call(t *testing.T) { require.Equal(t, int64(15+43), res.Int()) }) + t.Run("equal - string", func(t *testing.T) { + v := value.String("aa") + w := value.String("aa") + require.True(t, v.Equal(w)) + }) + t.Run("fully variadic", func(t *testing.T) { add := func(nums ...int) int { var sum int diff --git a/syntax/vm/error.go b/syntax/vm/error.go index 7f3ada3d5a..38d9528a5e 100644 --- a/syntax/vm/error.go +++ b/syntax/vm/error.go @@ -44,6 +44,9 @@ func makeDiagnostic(err error, assoc map[value.Value]ast.Node) error { case value.FieldError: fmt.Fprintf(&expr, ".%s", ne.Field) val = ne.Value + case value.ArgError: + message = ne.Inner.Error() + val = ne.Argument } cause = val diff --git a/syntax/vm/vm.go b/syntax/vm/vm.go index 71b2893ccc..f8d0e44341 100644 --- a/syntax/vm/vm.go +++ b/syntax/vm/vm.go @@ -509,3 +509,9 @@ func (s *Scope) IsStdlibDeprecated(name string) bool { _, exist := stdlib.DeprecatedIdentifiers[name] return exist } + +// IsStdlibExperimental returns true if the scoped identifier is experimental. +func (s *Scope) IsStdlibExperimental(fullName string) bool { + _, exist := stdlib.ExperimentalIdentifiers[fullName] + return exist +} diff --git a/syntax/vm/vm_stdlib_test.go b/syntax/vm/vm_stdlib_test.go index e8009ae157..36a74c29ab 100644 --- a/syntax/vm/vm_stdlib_test.go +++ b/syntax/vm/vm_stdlib_test.go @@ -39,6 +39,99 @@ func TestVM_Stdlib(t *testing.T) { {"encoding.from_yaml nil field", "encoding.from_yaml(`foo: null`)", map[string]interface{}{"foo": nil}}, {"encoding.from_yaml nil array element", `encoding.from_yaml("[0, null]")`, []interface{}{0, nil}}, {"encoding.from_base64", `encoding.from_base64("Zm9vYmFyMTIzIT8kKiYoKSctPUB+")`, string(`foobar123!?$*&()'-=@~`)}, + + // Map tests + { + // Basic case. No conflicting key/val pairs. + "array.combine_maps", + `array.combine_maps([{"a" = "a1", "b" = "b1"}], [{"a" = "a1", "c" = "c1"}], ["a"])`, + []map[string]interface{}{{"a": "a1", "b": "b1", "c": "c1"}}, + }, + { + // The first array has 2 maps, each with the same key/val pairs. + "array.combine_maps", + `array.combine_maps([{"a" = "a1", "b" = "b1"}, {"a" = "a1", "b" = "b1"}], [{"a" = "a1", "c" = "c1"}], ["a"])`, + []map[string]interface{}{{"a": "a1", "b": "b1", "c": "c1"}, {"a": "a1", "b": "b1", "c": "c1"}}, + }, + { + // Non-unique merge criteria. + "array.combine_maps", + `array.combine_maps([{"pod" = "a", "lbl" = "q"}, {"pod" = "b", "lbl" = "q"}], [{"pod" = "c", "lbl" = "q"}, {"pod" = "d", "lbl" = "q"}], ["lbl"])`, + []map[string]interface{}{{"lbl": "q", "pod": "c"}, {"lbl": "q", "pod": "d"}, {"lbl": "q", "pod": "c"}, {"lbl": "q", "pod": "d"}}, + }, + { + // Basic case. Integer and string values. + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = 2.2}], [{"a" = 1, "c" = "c1"}], ["a"])`, + []map[string]interface{}{{"a": 1, "b": 2.2, "c": "c1"}}, + }, + { + // The second map will override a value from the first. + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = 2.2}], [{"a" = 1, "b" = "3.3"}], ["a"])`, + []map[string]interface{}{{"a": 1, "b": "3.3"}}, + }, + { + // Not enough matches for a join. + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = 2.2}], [{"a" = 2, "b" = "3.3"}], ["a"])`, + []map[string]interface{}{}, + }, + { + // Not enough matches for a join. + // The "a" value has differing types. + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = 2.2}], [{"a" = "1", "b" = "3.3"}], ["a"])`, + []map[string]interface{}{}, + }, + { + // Basic case. Some values are arrays and maps. + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = [1,2,3]}], [{"a" = 1, "c" = {"d" = {"e" = 10}}}], ["a"])`, + []map[string]interface{}{{"a": 1, "b": []interface{}{1, 2, 3}, "c": map[string]interface{}{"d": map[string]interface{}{"e": 10}}}}, + }, + { + // Join key not present in ARG2 + "array.combine_maps", + `array.combine_maps([{"a" = 1, "n" = 1.1}], [{"a" = 1, "n" = 2.1}, {"n" = 2.2}], ["a"])`, + []map[string]interface{}{{"a": 1, "n": 2.1}}, + }, + { + // Join key not present in ARG1 + "array.combine_maps", + `array.combine_maps([{"a" = 1, "n" = 1.1}, {"n" = 1.2}], [{"a" = 1, "n" = 2.1}], ["a"])`, + []map[string]interface{}{{"a": 1, "n": 2.1}}, + }, + { + // Join with multiple keys + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = 3, "n" = 1.1}], [{"a" = 1, "b" = 3, "n" = 2.1}], ["a", "b"])`, + []map[string]interface{}{{"a": 1, "b": 3, "n": 2.1}}, + }, + { + // Join with multiple keys + // Some maps don't match all keys + "array.combine_maps", + `array.combine_maps([{"a" = 1, "n" = 1.1}, {"a" = 1, "b" = 3, "n" = 1.1}, {"b" = 3, "n" = 1.1}], [{"a" = 1, "n" = 2.3}, {"b" = 1, "n" = 2.3}, {"a" = 1, "b" = 3, "n" = 2.1}], ["a", "b"])`, + []map[string]interface{}{{"a": 1, "b": 3, "n": 2.1}}, + }, + { + // Join with multiple keys + // No match because one key is missing + "array.combine_maps", + `array.combine_maps([{"a" = 1, "n" = 1.1}, {"a" = 1, "b" = 3, "n" = 1.1}, {"b" = 3, "n" = 1.1}], [{"a" = 1, "n" = 2.3}, {"b" = 1, "n" = 2.3}, {"a" = 1, "b" = 3, "n" = 2.1}], ["a", "b", "c"])`, + []map[string]interface{}{}, + }, + { + // Multi match ends up with len(ARG1) * len(ARG2) maps + "array.combine_maps", + `array.combine_maps([{"a" = 1, "n" = 1.1}, {"a" = 1, "n" = 1.2}, {"a" = 1, "n" = 1.3}], [{"a" = 1, "n" = 2.1}, {"a" = 1, "n" = 2.2}, {"a" = 1, "n" = 2.3}], ["a"])`, + []map[string]interface{}{ + {"a": 1, "n": 2.1}, {"a": 1, "n": 2.2}, {"a": 1, "n": 2.3}, + {"a": 1, "n": 2.1}, {"a": 1, "n": 2.2}, {"a": 1, "n": 2.3}, + {"a": 1, "n": 2.1}, {"a": 1, "n": 2.2}, {"a": 1, "n": 2.3}, + }, + }, } for _, tc := range tt { @@ -55,6 +148,46 @@ func TestVM_Stdlib(t *testing.T) { } } +func TestVM_Stdlib_Errors(t *testing.T) { + tt := []struct { + name string + input string + expectedErr string + }{ + // Map tests + { + // Error: invalid RHS type - string. + "array.combine_maps", + `array.combine_maps([{"a" = "a1", "b" = "b1"}], "a", ["a"])`, + `"a" should be array, got string`, + }, + { + // Error: invalid RHS type - an array with strings. + "array.combine_maps", + `array.combine_maps([{"a" = "a1", "b" = "b1"}], ["a"], ["a"])`, + `"a" should be object, got string`, + }, + { + "array.combine_maps", + `array.combine_maps([{"a" = "a1", "b" = "b1"}], [{"a" = "a1", "c" = "b1"}], [])`, + `combine_maps: merge conditions must not be empty`, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + expr, err := parser.ParseExpression(tc.input) + require.NoError(t, err) + + eval := vm.New(expr) + + rv := reflect.New(reflect.TypeOf([]map[string]interface{}{})) + err = eval.Evaluate(nil, rv.Interface()) + require.ErrorContains(t, err, tc.expectedErr) + }) + } +} + func TestStdlibCoalesce(t *testing.T) { t.Setenv("TEST_VAR2", "Hello!")