From de93a1bd541e314f993b79814ebe4296ba516c86 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Mon, 14 Jun 2021 14:51:31 -0600 Subject: [PATCH] Promtail: add consul agent service discovery (#3834) * working prototype Signed-off-by: Trevor Whitney * update test for new responses and metadata labels Signed-off-by: Trevor Whitney * fix linting issues Signed-off-by: Trevor Whitney * clarify license/copyright/attribution to prometheus code Signed-off-by: Trevor Whitney * undo changes to go.sum when adding consul dep Signed-off-by: Trevor Whitney * add documentation around consul and consulagent scrape configs Signed-off-by: Trevor Whitney * Update docs/sources/clients/promtail/configuration.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> * Update docs/sources/clients/promtail/configuration.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> * Update docs/sources/clients/promtail/configuration.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> * Update docs/sources/clients/promtail/configuration.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> * Update docs/sources/clients/promtail/configuration.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> * Update docs/sources/clients/promtail/configuration.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> * Update docs/sources/clients/promtail/configuration.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> * Update docs/sources/clients/promtail/configuration.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> * Update docs/sources/clients/promtail/configuration.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> * Update docs/sources/clients/promtail/configuration.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> Co-authored-by: Ed Welch Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> --- .../promtail/discovery/consulagent/consul.go | 574 ++++++++++++++++++ .../discovery/consulagent/consul_test.go | 478 +++++++++++++++ .../pkg/promtail/scrapeconfig/scrapeconfig.go | 6 + .../sources/clients/promtail/configuration.md | 152 +++++ go.mod | 2 + vendor/modules.txt | 2 + 6 files changed, 1214 insertions(+) create mode 100644 clients/pkg/promtail/discovery/consulagent/consul.go create mode 100644 clients/pkg/promtail/discovery/consulagent/consul_test.go diff --git a/clients/pkg/promtail/discovery/consulagent/consul.go b/clients/pkg/promtail/discovery/consulagent/consul.go new file mode 100644 index 0000000000000..a1e54b3e12534 --- /dev/null +++ b/clients/pkg/promtail/discovery/consulagent/consul.go @@ -0,0 +1,574 @@ +// This code was adapted from the consul service discovery +// package in prometheus: https://github.com/prometheus/prometheus/blob/main/discovery/consul/consul.go +// which is copyrighted: 2015 The Prometheus Authors +// and licensed under the Apache License, Version 2.0 (the "License"); + +package consulagent + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "strconv" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + consul "github.com/hashicorp/consul/api" + conntrack "github.com/mwitkow/go-conntrack" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/util/strutil" +) + +const ( + watchTimeout = 2 * time.Minute + retryInterval = 15 * time.Second + + // addressLabel is the name for the label containing a target's address. + addressLabel = model.MetaLabelPrefix + "consulagent_address" + // nodeLabel is the name for the label containing a target's node name. + nodeLabel = model.MetaLabelPrefix + "consulagent_node" + // metaDataLabel is the prefix for the labels mapping to a target's metadata. + metaDataLabel = model.MetaLabelPrefix + "consulagent_metadata_" + // serviceMetaDataLabel is the prefix for the labels mapping to a target's service metadata. + serviceMetaDataLabel = model.MetaLabelPrefix + "consulagent_service_metadata_" + // tagsLabel is the name of the label containing the tags assigned to the target. + tagsLabel = model.MetaLabelPrefix + "consulagent_tags" + // serviceLabel is the name of the label containing the service name. + serviceLabel = model.MetaLabelPrefix + "consulagent_service" + // healthLabel is the name of the label containing the health of the service instance + healthLabel = model.MetaLabelPrefix + "consulagent_health" + // serviceAddressLabel is the name of the label containing the (optional) service address. + serviceAddressLabel = model.MetaLabelPrefix + "consulagent_service_address" + //servicePortLabel is the name of the label containing the service port. + servicePortLabel = model.MetaLabelPrefix + "consulagent_service_port" + // datacenterLabel is the name of the label containing the datacenter ID. + datacenterLabel = model.MetaLabelPrefix + "consulagent_dc" + // taggedAddressesLabel is the prefix for the labels mapping to a target's tagged addresses. + taggedAddressesLabel = model.MetaLabelPrefix + "consulagent_tagged_address_" + // serviceIDLabel is the name of the label containing the service ID. + serviceIDLabel = model.MetaLabelPrefix + "consulagent_service_id" + + // Constants for instrumentation. + namespace = "prometheus" +) + +var ( + rpcFailuresCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "sd_consulagent_rpc_failures_total", + Help: "The number of Consul Agent RPC call failures.", + }) + rpcDuration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "sd_consulagent_rpc_duration_seconds", + Help: "The duration of a Consul Agent RPC call in seconds.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{"endpoint", "call"}, + ) + + // Initialize metric vectors. + servicesRPCDuration = rpcDuration.WithLabelValues("agent", "services") + serviceRPCDuration = rpcDuration.WithLabelValues("agent", "service") + + // DefaultSDConfig is the default Consul SD configuration. + DefaultSDConfig = SDConfig{ + TagSeparator: ",", + Scheme: "http", + Server: "localhost:8500", + AllowStale: true, + RefreshInterval: model.Duration(30 * time.Second), + } +) + +func init() { + discovery.RegisterConfig(&SDConfig{}) + prometheus.MustRegister(rpcFailuresCount) + prometheus.MustRegister(rpcDuration) +} + +// SDConfig is the configuration for Consul service discovery. +type SDConfig struct { + Server string `yaml:"server,omitempty"` + Token config.Secret `yaml:"token,omitempty"` + Datacenter string `yaml:"datacenter,omitempty"` + TagSeparator string `yaml:"tag_separator,omitempty"` + Scheme string `yaml:"scheme,omitempty"` + Username string `yaml:"username,omitempty"` + Password config.Secret `yaml:"password,omitempty"` + + // See https://www.consul.io/docs/internals/consensus.html#consistency-modes, + // stale reads are a lot cheaper and are a necessity if you have >5k targets. + AllowStale bool `yaml:"allow_stale"` + // By default use blocking queries (https://www.consul.io/api/index.html#blocking-queries) + // but allow users to throttle updates if necessary. This can be useful because of "bugs" like + // https://github.com/hashicorp/consul/issues/3712 which cause an un-necessary + // amount of requests on consul. + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` + + // See https://www.consul.io/api/catalog.html#list-services + // The list of services for which targets are discovered. + // Defaults to all services if empty. + Services []string `yaml:"services,omitempty"` + // A list of tags used to filter instances inside a service. Services must contain all tags in the list. + ServiceTags []string `yaml:"tags,omitempty"` + // Desired node metadata. + NodeMeta map[string]string `yaml:"node_meta,omitempty"` + + TLSConfig config.TLSConfig `yaml:"tls_config,omitempty"` +} + +// Name returns the name of the Config. +func (*SDConfig) Name() string { return "consulagent" } + +// NewDiscoverer returns a Discoverer for the Config. +func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { + return NewDiscovery(c, opts.Logger) +} + +// SetDirectory joins any relative file paths with dir. +func (c *SDConfig) SetDirectory(dir string) { + c.TLSConfig.SetDirectory(dir) +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultSDConfig + type plain SDConfig + err := unmarshal((*plain)(c)) + if err != nil { + return err + } + if strings.TrimSpace(c.Server) == "" { + return errors.New("consulagent SD configuration requires a server address") + } + return nil +} + +// Discovery retrieves target information from a Consul server +// and updates them via watches. +type Discovery struct { + client *consul.Client + clientDatacenter string + tagSeparator string + watchedServices []string // Set of services which will be discovered. + watchedTags []string // Tags used to filter instances of a service. + watchedNodeMeta map[string]string + allowStale bool + refreshInterval time.Duration + finalizer func() + logger log.Logger +} + +// NewDiscovery returns a new Discovery for the given config. +func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { + if logger == nil { + logger = log.NewNopLogger() + } + + tls, err := config.NewTLSConfig(&conf.TLSConfig) + if err != nil { + return nil, err + } + transport := &http.Transport{ + IdleConnTimeout: 2 * watchTimeout, + TLSClientConfig: tls, + DialContext: conntrack.NewDialContextFunc( + conntrack.DialWithTracing(), + conntrack.DialWithName("consulagent_sd"), + ), + } + wrapper := &http.Client{ + Transport: transport, + Timeout: watchTimeout + 15*time.Second, + } + + clientConf := &consul.Config{ + Address: conf.Server, + Scheme: conf.Scheme, + Datacenter: conf.Datacenter, + Token: string(conf.Token), + HttpAuth: &consul.HttpBasicAuth{ + Username: conf.Username, + Password: string(conf.Password), + }, + HttpClient: wrapper, + } + client, err := consul.NewClient(clientConf) + if err != nil { + return nil, err + } + cd := &Discovery{ + client: client, + tagSeparator: conf.TagSeparator, + watchedServices: conf.Services, + watchedTags: conf.ServiceTags, + watchedNodeMeta: conf.NodeMeta, + allowStale: conf.AllowStale, + refreshInterval: time.Duration(conf.RefreshInterval), + clientDatacenter: conf.Datacenter, + finalizer: transport.CloseIdleConnections, + logger: logger, + } + return cd, nil +} + +// shouldWatch returns whether the service of the given name should be watched. +func (d *Discovery) shouldWatch(name string, tags []string) bool { + return d.shouldWatchFromName(name) && d.shouldWatchFromTags(tags) +} + +// shouldWatch returns whether the service of the given name should be watched based on its name. +func (d *Discovery) shouldWatchFromName(name string) bool { + // If there's no fixed set of watched services, we watch everything. + if len(d.watchedServices) == 0 { + return true + } + + for _, sn := range d.watchedServices { + if sn == name { + return true + } + } + return false +} + +// shouldWatch returns whether the service of the given name should be watched based on its tags. +// This gets called when the user doesn't specify a list of services in order to avoid watching +// *all* services. Details in https://github.com/prometheus/prometheus/pull/3814 +func (d *Discovery) shouldWatchFromTags(tags []string) bool { + // If there's no fixed set of watched tags, we watch everything. + if len(d.watchedTags) == 0 { + return true + } + +tagOuter: + for _, wtag := range d.watchedTags { + for _, tag := range tags { + if wtag == tag { + continue tagOuter + } + } + return false + } + return true +} + +// Get the local datacenter if not specified. +func (d *Discovery) getDatacenter() error { + // If the datacenter was not set from clientConf, let's get it from the local Consul agent + // (Consul default is to use local node's datacenter if one isn't given for a query). + if d.clientDatacenter != "" { + return nil + } + info, err := d.client.Agent().Self() + if err != nil { + level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err) + rpcFailuresCount.Inc() + return err + } + + dc, ok := info["Config"]["Datacenter"].(string) + if !ok { + err := errors.Errorf("invalid value '%v' for Config.Datacenter", info["Config"]["Datacenter"]) + level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err) + return err + } + + d.clientDatacenter = dc + return nil +} + +// Initialize the Discoverer run. +func (d *Discovery) initialize(ctx context.Context) { + // Loop until we manage to get the local datacenter. + for { + // We have to check the context at least once. The checks during channel sends + // do not guarantee that. + select { + case <-ctx.Done(): + return + default: + } + + // Get the local datacenter first, if necessary. + err := d.getDatacenter() + if err != nil { + time.Sleep(retryInterval) + continue + } + // We are good to go. + return + } +} + +// Run implements the Discoverer interface. +func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + if d.finalizer != nil { + defer d.finalizer() + } + d.initialize(ctx) + + if len(d.watchedServices) == 0 || len(d.watchedTags) != 0 { + // We need to watch the agent. + ticker := time.NewTicker(d.refreshInterval) + + // Watched services and their cancellation functions. + services := make(map[string]func()) + + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + default: + d.watchServices(ctx, ch, services) + <-ticker.C + } + } + } else { + // We only have fully defined services. + for _, name := range d.watchedServices { + d.watchService(ctx, ch, name) + } + <-ctx.Done() + } +} + +// Watch the catalog for new services we would like to watch. This is called only +// when we don't know yet the names of the services and need to ask Consul the +// entire list of services. +func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, services map[string]func()) { + agent := d.client.Agent() + level.Debug(d.logger).Log("msg", "Watching services", "tags", strings.Join(d.watchedTags, ",")) + + t0 := time.Now() + srvs, err := agent.Services() + elapsed := time.Since(t0) + servicesRPCDuration.Observe(elapsed.Seconds()) + + // Check the context before in order to exit early. + select { + case <-ctx.Done(): + return + default: + } + + if err != nil { + level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err) + rpcFailuresCount.Inc() + time.Sleep(retryInterval) + return + } + + discoveredServices := make(map[string]*consul.AgentService) + for _, srv := range srvs { + name := srv.Service + discoveredServices[name] = srv + + // use service name and tags to only watch + // the services that have the tag we are looking for (if specified). + // When no tags have been specified this will return true. + if !d.shouldWatch(name, srv.Tags) { + continue + } + if _, ok := services[name]; ok { + continue // We are already watching the service. + } + + wctx, cancel := context.WithCancel(ctx) + d.watchService(wctx, ch, name) + services[name] = cancel + } + + // Check for removed services. + for name, cancel := range services { + if _, ok := discoveredServices[name]; !ok { + level.Debug(d.logger).Log( + "msg", "removing service since consul no longer has a record of it", + "name", name) + // Call the watch cancellation function. + cancel() + delete(services, name) + + // Send clearing target group. + select { + case <-ctx.Done(): + return + case ch <- []*targetgroup.Group{{Source: name}}: + } + } + } + + // Send targetgroup with no targets if nothing was discovered. + if len(services) == 0 { + select { + case <-ctx.Done(): + return + case ch <- []*targetgroup.Group{{}}: + } + } +} + +// consulService contains data belonging to the same service. +type consulService struct { + name string + tags []string + labels model.LabelSet + discovery *Discovery + client *consul.Client + tagSeparator string + logger log.Logger +} + +// Start watching a service. +func (d *Discovery) watchService(ctx context.Context, ch chan<- []*targetgroup.Group, name string) { + srv := &consulService{ + discovery: d, + client: d.client, + name: name, + tags: d.watchedTags, + labels: model.LabelSet{ + serviceLabel: model.LabelValue(name), + datacenterLabel: model.LabelValue(d.clientDatacenter), + }, + tagSeparator: d.tagSeparator, + logger: d.logger, + } + + go func() { + ticker := time.NewTicker(d.refreshInterval) + defer ticker.Stop() + agent := srv.client.Agent() + for { + select { + case <-ctx.Done(): + return + default: + srv.watch(ctx, ch, agent) + select { + case <-ticker.C: + case <-ctx.Done(): + return + } + } + } + }() +} + +// Get updates for a service. +func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Group, agent *consul.Agent) { + level.Debug(srv.logger).Log("msg", "Watching service", "service", srv.name, "tags", strings.Join(srv.tags, ",")) + + t0 := time.Now() + aggregatedStatus, serviceChecks, err := agent.AgentHealthServiceByName(srv.name) + elapsed := time.Since(t0) + serviceRPCDuration.Observe(elapsed.Seconds()) + + // Check the context before in order to exit early. + select { + case <-ctx.Done(): + return + default: + // Continue. + } + + if err != nil { + level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "tags", strings.Join(srv.tags, ","), "err", err) + rpcFailuresCount.Inc() + time.Sleep(retryInterval) + return + } + + self, err := agent.Self() + if err != nil { + level.Error(srv.logger).Log("msg", "failed to get agent info from agent api", "err", err) + return + } + var member = consul.AgentMember{} + memberBytes, err := json.Marshal(self["Member"]) + if err != nil { + level.Error(srv.logger).Log("msg", "failed to get member information from agent", "err", err) + return + } + err = json.Unmarshal(memberBytes, &member) + if err != nil { + level.Error(srv.logger).Log("msg", "failed to unmarshal member information from agent", "err", err) + return + } + + nodeName := self["Config"]["NodeName"].(string) + meta := self["Meta"] + + tgroup := targetgroup.Group{ + Source: srv.name, + Labels: srv.labels, + Targets: make([]model.LabelSet, 0, len(serviceChecks)), + } + + for _, srvCheck := range serviceChecks { + // We surround the separated list with the separator as well. This way regular expressions + // in relabeling rules don't have to consider tag positions. + var tags = srv.tagSeparator + strings.Join(srvCheck.Service.Tags, srv.tagSeparator) + srv.tagSeparator + + // If the service address is not empty it should be used instead of the node address + // since the service may be registered remotely through a different node. + var addr string + if srvCheck.Service.Address != "" { + addr = net.JoinHostPort(srvCheck.Service.Address, fmt.Sprintf("%d", srvCheck.Service.Port)) + } else { + addr = net.JoinHostPort(member.Addr, fmt.Sprintf("%d", srvCheck.Service.Port)) + } + + labels := model.LabelSet{ + model.AddressLabel: model.LabelValue(addr), + addressLabel: model.LabelValue(member.Addr), + nodeLabel: model.LabelValue(nodeName), + tagsLabel: model.LabelValue(tags), + serviceAddressLabel: model.LabelValue(srvCheck.Service.Address), + servicePortLabel: model.LabelValue(strconv.Itoa(srvCheck.Service.Port)), + serviceIDLabel: model.LabelValue(srvCheck.Service.ID), + healthLabel: model.LabelValue(aggregatedStatus), + } + + // Add all key/value pairs from the node's metadata as their own labels. + for k, v := range meta { + if str, ok := v.(string); ok { + name := strutil.SanitizeLabelName(k) + labels[metaDataLabel+model.LabelName(name)] = model.LabelValue(str) + } + } + + // Add all key/value pairs from the service's metadata as their own labels. + for k, v := range srvCheck.Service.Meta { + name := strutil.SanitizeLabelName(k) + labels[serviceMetaDataLabel+model.LabelName(name)] = model.LabelValue(v) + } + + // Add all key/value pairs from the service's tagged addresses as their own labels. + for k, v := range srvCheck.Service.TaggedAddresses { + name := strutil.SanitizeLabelName(k) + address := fmt.Sprintf("%s:%d", v.Address, v.Port) + labels[taggedAddressesLabel+model.LabelName(name)] = model.LabelValue(address) + } + + tgroup.Targets = append(tgroup.Targets, labels) + } + + select { + case <-ctx.Done(): + case ch <- []*targetgroup.Group{&tgroup}: + } +} diff --git a/clients/pkg/promtail/discovery/consulagent/consul_test.go b/clients/pkg/promtail/discovery/consulagent/consul_test.go new file mode 100644 index 0000000000000..85d2b964be945 --- /dev/null +++ b/clients/pkg/promtail/discovery/consulagent/consul_test.go @@ -0,0 +1,478 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consulagent + +import ( + "context" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +//nolint:interfacer // this follows the pattern in prometheus service discovery +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +func TestConfiguredService(t *testing.T) { + conf := &SDConfig{ + Services: []string{"configuredServiceName"}} + consulDiscovery, err := NewDiscovery(conf, nil) + + if err != nil { + t.Errorf("Unexpected error when initializing discovery %v", err) + } + if !consulDiscovery.shouldWatch("configuredServiceName", []string{""}) { + t.Errorf("Expected service %s to be watched", "configuredServiceName") + } + if consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}) { + t.Errorf("Expected service %s to not be watched", "nonConfiguredServiceName") + } +} + +func TestConfiguredServiceWithTag(t *testing.T) { + conf := &SDConfig{ + Services: []string{"configuredServiceName"}, + ServiceTags: []string{"http"}, + } + consulDiscovery, err := NewDiscovery(conf, nil) + + if err != nil { + t.Errorf("Unexpected error when initializing discovery %v", err) + } + if consulDiscovery.shouldWatch("configuredServiceName", []string{""}) { + t.Errorf("Expected service %s to not be watched without tag", "configuredServiceName") + } + if !consulDiscovery.shouldWatch("configuredServiceName", []string{"http"}) { + t.Errorf("Expected service %s to be watched with tag %s", "configuredServiceName", "http") + } + if consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}) { + t.Errorf("Expected service %s to not be watched without tag", "nonConfiguredServiceName") + } + if consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{"http"}) { + t.Errorf("Expected service %s to not be watched with tag %s", "nonConfiguredServiceName", "http") + } +} + +func TestConfiguredServiceWithTags(t *testing.T) { + type testcase struct { + // What we've configured to watch. + conf *SDConfig + // The service we're checking if we should watch or not. + serviceName string + serviceTags []string + shouldWatch bool + } + + cases := []testcase{ + { + conf: &SDConfig{ + Services: []string{"configuredServiceName"}, + ServiceTags: []string{"http", "v1"}, + }, + serviceName: "configuredServiceName", + serviceTags: []string{""}, + shouldWatch: false, + }, + { + conf: &SDConfig{ + Services: []string{"configuredServiceName"}, + ServiceTags: []string{"http", "v1"}, + }, + serviceName: "configuredServiceName", + serviceTags: []string{"http", "v1"}, + shouldWatch: true, + }, + { + conf: &SDConfig{ + Services: []string{"configuredServiceName"}, + ServiceTags: []string{"http", "v1"}, + }, + serviceName: "nonConfiguredServiceName", + serviceTags: []string{""}, + shouldWatch: false, + }, + { + conf: &SDConfig{ + Services: []string{"configuredServiceName"}, + ServiceTags: []string{"http", "v1"}, + }, + serviceName: "nonConfiguredServiceName", + serviceTags: []string{"http, v1"}, + shouldWatch: false, + }, + { + conf: &SDConfig{ + Services: []string{"configuredServiceName"}, + ServiceTags: []string{"http", "v1"}, + }, + serviceName: "configuredServiceName", + serviceTags: []string{"http", "v1", "foo"}, + shouldWatch: true, + }, + { + conf: &SDConfig{ + Services: []string{"configuredServiceName"}, + ServiceTags: []string{"http", "v1", "foo"}, + }, + serviceName: "configuredServiceName", + serviceTags: []string{"http", "v1", "foo"}, + shouldWatch: true, + }, + { + conf: &SDConfig{ + Services: []string{"configuredServiceName"}, + ServiceTags: []string{"http", "v1"}, + }, + serviceName: "configuredServiceName", + serviceTags: []string{"http", "v1", "v1"}, + shouldWatch: true, + }, + } + + for _, tc := range cases { + consulDiscovery, err := NewDiscovery(tc.conf, nil) + + if err != nil { + t.Errorf("Unexpected error when initializing discovery %v", err) + } + ret := consulDiscovery.shouldWatch(tc.serviceName, tc.serviceTags) + if ret != tc.shouldWatch { + t.Errorf("Expected should watch? %t, got %t. Watched service and tags: %s %+v, input was %s %+v", tc.shouldWatch, ret, tc.conf.Services, tc.conf.ServiceTags, tc.serviceName, tc.serviceTags) + } + + } +} + +func TestNonConfiguredService(t *testing.T) { + conf := &SDConfig{} + consulDiscovery, err := NewDiscovery(conf, nil) + + if err != nil { + t.Errorf("Unexpected error when initializing discovery %v", err) + } + if !consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}) { + t.Errorf("Expected service %s to be watched", "nonConfiguredServiceName") + } +} + +const ( + AgentAnswer = `{ + "Config": { + "Datacenter": "test-dc", + "NodeName": "test-node", + "NodeID": "efd2573b-4c48-312b-2097-99bffc4352c4", + "Revision": "a9322b9c7", + "Server": false, + "Version": "1.8.3" + } +}` + ServiceTestAnswer = ` +[{ + "AggregatedStatus": "passing", + "Service": { + "ID": "test-id-1234", + "Service": "test", + "Tags": ["tag1"], + "Address": "", + "Meta": {"version":"1.0.0","environment":"staging"}, + "Port": 3341, + "Weights": { + "Passing": 1, + "Warning": 1 + }, + "EnableTagOverride": false, + "ProxyDestination": "", + "Proxy": {}, + "Connect": {}, + "CreateIndex": 1, + "ModifyIndex": 1 + }, + "Checks": [{ + "Node": "node1", + "CheckID": "serfHealth", + "Name": "Serf Health Status", + "Status": "passing" + }] +}]` + ServiceOtherAnswer = ` +[{ + "AggregatedStatus": "passing", + "Service": { + "ID": "other-id-5678", + "Service": "other", + "Tags": ["tag2"], + "Address": "", + "Meta": {"version":"1.0.0","environment":"staging"}, + "Port": 0, + "Weights": { + "Passing": 1, + "Warning": 1 + }, + "EnableTagOverride": false, + "ProxyDestination": "", + "Proxy": {}, + "Connect": {}, + "CreateIndex": 1, + "ModifyIndex": 1 + }, + "Checks": [{ + "Node": "node1", + "CheckID": "serfHealth", + "Name": "Serf Health Status", + "Status": "passing" + }] +}]` + + ServicesTestAnswer = ` +{ + "test-id-1234": { + "ID": "test-id-1234", + "Service": "test", + "Tags": [ "tag1" ], + "Meta": {"version":"1.0.0","environment":"staging"}, + "Port": 3341, + "Address": "1.1.1.1", + "TaggedAddresses": { + "lan_ipv4": { + "Address": "1.1.1.1", + "Port": 4646 + }, + "wan_ipv4": { + "Address": "1.1.1.1", + "Port": 4646 + } + }, + "Weights": { + "Passing": 1, + "Warning": 1 + }, + "EnableTagOverride": false + }, + "test-id-5678": { + "ID": "test-id-5678", + "Service": "test", + "Tags": [ "tag1" ], + "Meta": {"version":"1.0.0","environment":"staging"}, + "Port": 3341, + "Address": "1.1.2.2", + "TaggedAddresses": { + "lan_ipv4": { + "Address": "1.1.2.2", + "Port": 4646 + }, + "wan_ipv4": { + "Address": "1.1.2.2", + "Port": 4646 + } + }, + "Weights": { + "Passing": 1, + "Warning": 1 + }, + "EnableTagOverride": false + }, + "other-id-9876": { + "ID": "other-id-9876", + "Service": "other", + "Tags": [ "tag2" ], + "Meta": {"version":"1.0.0","environment":"staging"}, + "Port": 0, + "Address": "", + "Weights": { + "Passing": 1, + "Warning": 1 + }, + "EnableTagOverride": false + } +}` +) + +func newServer(t *testing.T) (*httptest.Server, *SDConfig) { + // github.com/hashicorp/consul/testutil/ would be nice but it needs a local consul binary. + stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := "" + switch r.URL.String() { + case "/v1/agent/self": + response = AgentAnswer + case "/v1/agent/health/service/name/test?format=json": + response = ServiceTestAnswer + case "/v1/agent/health/service/name/other?format=json": + response = ServiceOtherAnswer + case "/v1/agent/services": + response = ServicesTestAnswer + default: + t.Errorf("Unhandled consul call: %s", r.URL) + } + w.Header().Add("X-Consul-Index", "1") + _, err := w.Write([]byte(response)) + require.NoError(t, err) + })) + stuburl, err := url.Parse(stub.URL) + require.NoError(t, err) + + config := &SDConfig{ + Server: stuburl.Host, + Token: "fake-token", + RefreshInterval: model.Duration(1 * time.Second), + } + return stub, config +} + +func newDiscovery(t *testing.T, config *SDConfig) *Discovery { + logger := log.NewNopLogger() + d, err := NewDiscovery(config, logger) + require.NoError(t, err) + return d +} + +func checkOneTarget(t *testing.T, tg []*targetgroup.Group) { + require.Equal(t, 1, len(tg)) + target := tg[0] + require.Equal(t, "test-dc", string(target.Labels["__meta_consulagent_dc"])) + require.Equal(t, target.Source, string(target.Labels["__meta_consulagent_service"])) + if target.Source == "test" { + // test service should have one node. + require.Greater(t, len(target.Targets), 0, "Test service should have one node") + } +} + +// Watch all the services in the catalog. +func TestAllServices(t *testing.T) { + stub, config := newServer(t) + defer stub.Close() + + d := newDiscovery(t, config) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []*targetgroup.Group) + go func() { + d.Run(ctx, ch) + close(ch) + }() + checkOneTarget(t, <-ch) + checkOneTarget(t, <-ch) + cancel() + <-ch +} + +// targetgroup with no targets is emitted if no services were discovered. +func TestNoTargets(t *testing.T) { + stub, config := newServer(t) + defer stub.Close() + config.ServiceTags = []string{"missing"} + + d := newDiscovery(t, config) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []*targetgroup.Group) + go d.Run(ctx, ch) + + targets := (<-ch)[0].Targets + require.Equal(t, 0, len(targets)) + cancel() +} + +// Watch only the test service. +func TestOneService(t *testing.T) { + stub, config := newServer(t) + defer stub.Close() + + config.Services = []string{"test"} + d := newDiscovery(t, config) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []*targetgroup.Group) + go d.Run(ctx, ch) + checkOneTarget(t, <-ch) + cancel() +} + +// Watch the test service with a specific tag and node-meta. +func TestAllOptions(t *testing.T) { + stub, config := newServer(t) + defer stub.Close() + + config.Services = []string{"test"} + config.NodeMeta = map[string]string{"rack_name": "2304"} + config.ServiceTags = []string{"tag1"} + config.AllowStale = true + config.Token = "fake-token" + + d := newDiscovery(t, config) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []*targetgroup.Group) + go func() { + d.Run(ctx, ch) + close(ch) + }() + checkOneTarget(t, <-ch) + cancel() + <-ch +} + +func TestGetDatacenterShouldReturnError(t *testing.T) { + for _, tc := range []struct { + handler func(http.ResponseWriter, *http.Request) + errMessage string + }{ + { + // Define a handler that will return status 500. + handler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(500) + }, + errMessage: "Unexpected response code: 500 ()", + }, + { + // Define a handler that will return incorrect response. + handler: func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte(`{"Config": {"Not-Datacenter": "test-dc"}}`)) + require.NoError(t, err) + }, + errMessage: "invalid value '' for Config.Datacenter", + }, + } { + stub := httptest.NewServer(http.HandlerFunc(tc.handler)) + stuburl, err := url.Parse(stub.URL) + require.NoError(t, err) + + config := &SDConfig{ + Server: stuburl.Host, + Token: "fake-token", + RefreshInterval: model.Duration(1 * time.Second), + } + defer stub.Close() + d := newDiscovery(t, config) + + // Should be empty if not initialized. + require.Equal(t, "", d.clientDatacenter) + + err = d.getDatacenter() + + // An error should be returned. + require.Equal(t, tc.errMessage, err.Error()) + // Should still be empty. + require.Equal(t, "", d.clientDatacenter) + } +} diff --git a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go index a5017c09397ca..12628c5351c0c 100644 --- a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go +++ b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go @@ -24,6 +24,7 @@ import ( "github.com/weaveworks/common/server" "github.com/grafana/loki/clients/pkg/logentry/stages" + "github.com/grafana/loki/clients/pkg/promtail/discovery/consulagent" ) // Config describes a job to scrape. @@ -48,6 +49,8 @@ type ServiceDiscoveryConfig struct { FileSDConfigs []*file.SDConfig `yaml:"file_sd_configs,omitempty"` // List of Consul service discovery configurations. ConsulSDConfigs []*consul.SDConfig `yaml:"consul_sd_configs,omitempty"` + // List of Consul agent service discovery configurations. + ConsulAgentSDConfigs []*consulagent.SDConfig `yaml:"consulagent_sd_configs,omitempty"` // List of DigitalOcean service discovery configurations. DigitalOceanSDConfigs []*digitalocean.SDConfig `yaml:"digitalocean_sd_configs,omitempty"` // List of Docker Swarm service discovery configurations. @@ -85,6 +88,9 @@ func (cfg ServiceDiscoveryConfig) Configs() (res discovery.Configs) { for _, x := range cfg.ConsulSDConfigs { res = append(res, x) } + for _, x := range cfg.ConsulAgentSDConfigs { + res = append(res, x) + } for _, x := range cfg.DigitalOceanSDConfigs { res = append(res, x) } diff --git a/docs/sources/clients/promtail/configuration.md b/docs/sources/clients/promtail/configuration.md index cf15ef3592e9b..486db72c6862a 100644 --- a/docs/sources/clients/promtail/configuration.md +++ b/docs/sources/clients/promtail/configuration.md @@ -308,6 +308,16 @@ file_sd_configs: # same host. kubernetes_sd_configs: - [] + +# Describes how to use the Consul Catalog API to discover services registered with the +# consul cluster. +consul_sd_configs: + [ - ... ] + +# Describes how to use the Consul Agent API to discover services registered with the consul agent +# running on the same host as Promtail. +consulagent_sd_configs: + [ - ... ] ``` ### pipeline_stages @@ -1152,6 +1162,148 @@ You may wish to check out the 3rd party [Prometheus Operator](https://github.com/coreos/prometheus-operator), which automates the Prometheus setup on top of Kubernetes. +### consul_sd_config + +Consul SD configurations allow retrieving scrape targets from the [Consul Catalog API](https://www.consul.io). +When using the Catalog API, each running Promtail will get +a list of all services known to the whole consul cluster when discovering +new targets. + +The following meta labels are available on targets during [relabeling](#relabel_configs): + +* `__meta_consul_address`: the address of the target +* `__meta_consul_dc`: the datacenter name for the target +* `__meta_consul_health`: the health status of the service +* `__meta_consul_metadata_`: each node metadata key value of the target +* `__meta_consul_node`: the node name defined for the target +* `__meta_consul_service_address`: the service address of the target +* `__meta_consul_service_id`: the service ID of the target +* `__meta_consul_service_metadata_`: each service metadata key value of the target +* `__meta_consul_service_port`: the service port of the target +* `__meta_consul_service`: the name of the service the target belongs to +* `__meta_consul_tagged_address_`: each node tagged address key value of the target +* `__meta_consul_tags`: the list of tags of the target joined by the tag separator + +```yaml +# The information to access the Consul Catalog API. It is to be defined +# as the Consul documentation requires. +[ server: | default = "localhost:8500" ] +[ token: ] +[ datacenter: ] +[ scheme: | default = "http" ] +[ username: ] +[ password: ] + +tls_config: + [ ] + +# A list of services for which targets are retrieved. If omitted, all services +# are scraped. +services: + [ - ] + +# See https://www.consul.io/api/catalog.html#list-nodes-for-service to know more +# about the possible filters that can be used. + +# An optional list of tags used to filter nodes for a given service. Services must contain all tags in the list. +tags: + [ - ] + +# Node metadata key/value pairs to filter nodes for a given service. +[ node_meta: + [ : ... ] ] + +# The string by which Consul tags are joined into the tag label. +[ tag_separator: | default = , ] + +# Allow stale Consul results (see https://www.consul.io/api/features/consistency.html). Will reduce load on Consul. +[ allow_stale: | default = true ] + +# The time after which the provided names are refreshed. +# On large setup it might be a good idea to increase this value because the catalog will change all the time. +[ refresh_interval: | default = 30s ] +``` + +Note that the IP number and port used to scrape the targets is assembled as +`<__meta_consul_address>:<__meta_consul_service_port>`. However, in some +Consul setups, the relevant address is in `__meta_consul_service_address`. +In those cases, you can use the [relabel](#relabel_configs) +feature to replace the special `__address__` label. + +The [relabeling phase](#relabel_configs) is the preferred and more powerful +way to filter services or nodes for a service based on arbitrary labels. For +users with thousands of services it can be more efficient to use the Consul API +directly which has basic support for filtering nodes (currently by node +metadata and a single tag). + +### consulagent_sd_config + +Consul Agent SD configurations allow retrieving scrape targets from [Consul's](https://www.consul.io) +Agent API. When using the Agent API, each running Promtail will only get +services registered with the local agent running on the same host when discovering +new targets. This is suitable for very large Consul clusters for which using the +Catalog API would be too slow or resource intensive. + +The following meta labels are available on targets during [relabeling](#relabel_configs): + +* `__meta_consulagent_address`: the address of the target +* `__meta_consulagent_dc`: the datacenter name for the target +* `__meta_consulagent_health`: the health status of the service +* `__meta_consulagent_metadata_`: each node metadata key value of the target +* `__meta_consulagent_node`: the node name defined for the target +* `__meta_consulagent_service_address`: the service address of the target +* `__meta_consulagent_service_id`: the service ID of the target +* `__meta_consulagent_service_metadata_`: each service metadata key value of the target +* `__meta_consulagent_service_port`: the service port of the target +* `__meta_consulagent_service`: the name of the service the target belongs to +* `__meta_consulagent_tagged_address_`: each node tagged address key value of the target +* `__meta_consulagent_tags`: the list of tags of the target joined by the tag separator + +```yaml +# The information to access the Consul Agent API. It is to be defined +# as the Consul documentation requires. +[ server: | default = "localhost:8500" ] +[ token: ] +[ datacenter: ] +[ scheme: | default = "http" ] +[ username: ] +[ password: ] + +tls_config: + [ ] + +# A list of services for which targets are retrieved. If omitted, all services +# are scraped. +services: + [ - ] + +# See https://www.consul.io/api-docs/agent/service#filtering to know more +# about the possible filters that can be used. + +# An optional list of tags used to filter nodes for a given service. Services must contain all tags in the list. +tags: + [ - ] + +# Node metadata key/value pairs to filter nodes for a given service. +[ node_meta: + [ : ... ] ] + +# The string by which Consul tags are joined into the tag label. +[ tag_separator: | default = , ] +``` + +Note that the IP address and port number used to scrape the targets is assembled as +`<__meta_consul_address>:<__meta_consul_service_port>`. However, in some +Consul setups, the relevant address is in `__meta_consul_service_address`. +In those cases, you can use the [relabel](#relabel_configs) +feature to replace the special `__address__` label. + +The [relabeling phase](#relabel_configs) is the preferred and more powerful +way to filter services or nodes for a service based on arbitrary labels. For +users with thousands of services it can be more efficient to use the Consul API +directly which has basic support for filtering nodes (currently by node +metadata and a single tag). + ## target_config The `target_config` block controls the behavior of reading files from discovered diff --git a/go.mod b/go.mod index 01aee64c44506..86c6a0dd2385f 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.1-0.20191002090509-6af20e3a5340 // indirect github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 + github.com/hashicorp/consul/api v1.8.1 github.com/hashicorp/golang-lru v0.5.4 github.com/hpcloud/tail v1.0.0 github.com/imdario/mergo v0.3.11 @@ -65,6 +66,7 @@ require ( github.com/weaveworks/common v0.0.0-20210419092856-009d1eebd624 go.etcd.io/bbolt v1.3.5 go.uber.org/atomic v1.7.0 + go.uber.org/goleak v1.1.10 golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 golang.org/x/net v0.0.0-20210505214959-0714010a04ed golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 diff --git a/vendor/modules.txt b/vendor/modules.txt index af109839fe2f3..748861ff7cadc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -549,6 +549,7 @@ github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc # github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed github.com/hailocab/go-hostpool # github.com/hashicorp/consul/api v1.8.1 +## explicit github.com/hashicorp/consul/api # github.com/hashicorp/errwrap v1.0.0 github.com/hashicorp/errwrap @@ -1121,6 +1122,7 @@ go.opentelemetry.io/otel/label ## explicit go.uber.org/atomic # go.uber.org/goleak v1.1.10 +## explicit go.uber.org/goleak go.uber.org/goleak/internal/stack # go.uber.org/multierr v1.5.0