diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 31bbb3515227c..623c206af33d2 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -379,10 +379,6 @@ The `frontend` block configures the Loki query-frontend. # CLI flag: -frontend.downstream-url [downstream_url: | default = ""] -# Address, including port, where the compactor api is served -# CLI flag: -frontend.compactor-address -[compactor_address: | default = ""] - # Log queries that are slower than the specified duration. Set to 0 to disable. # Set to < 0 to enable on all queries. # CLI flag: -frontend.log-queries-longer-than @@ -2604,6 +2600,10 @@ This way, one doesn't have to replicate configuration in multiple places. # to be used by the distributor's ring, but only if the distributor's ring itself # doesn't have a `heartbeat_period` set. [ring: ] + +# Address, including port, where the compactor api is served +# CLI flag: -common.compactor-address +[compactor_address: | default = ""] ``` ## analytics diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index 6c296027419cc..721ffed9d12e3 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -40,9 +40,12 @@ type Config struct { // You can check this during Loki execution under ring status pages (ex: `/ring` will output the address of the different ingester // instances). InstanceAddr string `yaml:"instance_addr"` + + // CompactorAddress is the http address of the compactor in the form http://host:port + CompactorAddress string `yaml:"compactor_address"` } -func (c *Config) RegisterFlags(_ *flag.FlagSet) { +func (c *Config) RegisterFlags(f *flag.FlagSet) { throwaway := flag.NewFlagSet("throwaway", flag.PanicOnError) throwaway.IntVar(&c.ReplicationFactor, "common.replication-factor", 3, "How many ingesters incoming data should be replicated to.") c.Storage.RegisterFlagsWithPrefix("common.storage", throwaway) @@ -52,6 +55,8 @@ func (c *Config) RegisterFlags(_ *flag.FlagSet) { c.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger) throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.") throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.") + + f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port") } type Storage struct { diff --git a/pkg/loki/delete_store_listener.go b/pkg/loki/delete_store_listener.go index 2a399603784c4..99e67c0893dfe 100644 --- a/pkg/loki/delete_store_listener.go +++ b/pkg/loki/delete_store_listener.go @@ -6,12 +6,12 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion" ) -func deleteRequestsStoreListener(d deletion.DeleteRequestsStore) *listener { +func deleteRequestsStoreListener(d deletion.DeleteRequestsClient) *listener { return &listener{d} } type listener struct { - deleteRequestsStore deletion.DeleteRequestsStore + deleteRequestsClient deletion.DeleteRequestsClient } // Starting is called when the service transitions from NEW to STARTING. @@ -26,7 +26,7 @@ func (l *listener) Stopping(from services.State) { // no need to do anything return } - l.deleteRequestsStore.Stop() + l.deleteRequestsClient.Stop() } // Terminated is called when the service transitions to the TERMINATED state. @@ -35,7 +35,7 @@ func (l *listener) Terminated(from services.State) { // no need to do anything return } - l.deleteRequestsStore.Stop() + l.deleteRequestsClient.Stop() } // Failed is called when the service transitions to the FAILED state. @@ -44,5 +44,5 @@ func (l *listener) Failed(from services.State, failure error) { // no need to do anything return } - l.deleteRequestsStore.Stop() + l.deleteRequestsClient.Stop() } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index de9ff99dfbcb7..1d6e28315000f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -241,7 +241,7 @@ func (t *Loki) initQuerier() (services.Service, error) { // Querier worker's max concurrent requests must be the same as the querier setting t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent - deleteStore, err := t.deleteRequestsStore() + deleteStore, err := t.deleteRequestsClient() if err != nil { return nil, err } @@ -575,17 +575,25 @@ func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) { return deletion.NewNoOpDeleteRequestsStore(), nil } - compactorAddress := t.Cfg.Frontend.CompactorAddress + compactorAddress, err := t.compactorAddress() + if err != nil { + return nil, err + } + + return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}) +} + +func (t *Loki) compactorAddress() (string, error) { if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) { // In single binary or read modes, this module depends on Server - compactorAddress = fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort) + return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil } - if compactorAddress == "" { - return nil, errors.New("query filtering for deletes requires 'compactor_address' to be configured") + if t.Cfg.Common.CompactorAddress == "" { + return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured") } - return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}) + return t.Cfg.Common.CompactorAddress, nil } func (t *Loki) initQueryFrontend() (_ services.Service, err error) { @@ -742,7 +750,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) { t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort - deleteStore, err := t.deleteRequestsStore() + deleteStore, err := t.deleteRequestsClient() if err != nil { return nil, err } @@ -949,7 +957,7 @@ func (t *Loki) initUsageReport() (services.Service, error) { return ur, nil } -func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) { +func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) { // TODO(owen-d): enable delete request storage in tsdb if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) { return deletion.NewNoOpDeleteRequestsStore(), nil @@ -960,16 +968,16 @@ func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) { return nil, err } - deleteStore := deletion.NewNoOpDeleteRequestsStore() - if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled { - indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, nil, prometheus.DefaultRegisterer) - if err != nil { - return nil, err - } + if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !filteringEnabled { + return deletion.NewNoOpDeleteRequestsStore(), nil + } - deleteStore = deletion.NewDeleteStoreFromIndexClient(indexClient) + compactorAddress, err := t.compactorAddress() + if err != nil { + return nil, err } - return deleteStore, nil + + return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}) } func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) { diff --git a/pkg/lokifrontend/config.go b/pkg/lokifrontend/config.go index b4d18c84cb681..f7cb5475f3bc6 100644 --- a/pkg/lokifrontend/config.go +++ b/pkg/lokifrontend/config.go @@ -15,7 +15,6 @@ type Config struct { CompressResponses bool `yaml:"compress_responses"` DownstreamURL string `yaml:"downstream_url"` - CompactorAddress string `yaml:"compactor_address"` TailProxyURL string `yaml:"tail_proxy_url"` } @@ -28,6 +27,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.") f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.") - f.StringVar(&cfg.CompactorAddress, "frontend.compactor-address", "", "host and port where the compactor API is listening") f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.") } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go new file mode 100644 index 0000000000000..bde272b520acc --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go @@ -0,0 +1,174 @@ +package deletion + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "sync" + "time" + + "github.com/go-kit/log/level" + + "github.com/grafana/loki/pkg/util/log" +) + +const ( + orgHeaderKey = "X-Scope-OrgID" + getDeletePath = "/loki/api/v1/delete" +) + +type DeleteRequestsClient interface { + GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) + Stop() +} + +type deleteRequestsClient struct { + url string + httpClient httpClient + mu sync.RWMutex + + cache map[string][]DeleteRequest + cacheDuration time.Duration + + stopChan chan struct{} +} + +type httpClient interface { + Do(*http.Request) (*http.Response, error) +} + +type DeleteRequestsStoreOption func(c *deleteRequestsClient) + +func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { + return func(c *deleteRequestsClient) { + c.cacheDuration = d + } +} + +func NewDeleteRequestsClient(addr string, c httpClient, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { + u, err := url.Parse(addr) + if err != nil { + level.Error(log.Logger).Log("msg", "error parsing url", "err", err) + return nil, err + } + u.Path = getDeletePath + + client := &deleteRequestsClient{ + url: u.String(), + httpClient: c, + cacheDuration: 5 * time.Minute, + cache: make(map[string][]DeleteRequest), + stopChan: make(chan struct{}), + } + + for _, o := range opts { + o(client) + } + + go client.updateLoop() + return client, nil +} + +func (c *deleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) { + if cachedRequests, ok := c.getCachedRequests(userID); ok { + return cachedRequests, nil + } + + requests, err := c.getRequestsFromServer(ctx, userID) + if err != nil { + return nil, err + } + + c.mu.Lock() + defer c.mu.Unlock() + c.cache[userID] = requests + + return requests, nil +} + +func (c *deleteRequestsClient) getCachedRequests(userID string) ([]DeleteRequest, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + res, ok := c.cache[userID] + return res, ok +} + +func (c *deleteRequestsClient) Stop() { + close(c.stopChan) +} + +func (c *deleteRequestsClient) updateLoop() { + t := time.NewTicker(c.cacheDuration) + for { + select { + case <-t.C: + c.updateCache() + case <-c.stopChan: + return + } + } +} + +func (c *deleteRequestsClient) updateCache() { + userIDs := c.currentUserIDs() + + newCache := make(map[string][]DeleteRequest) + for _, userID := range userIDs { + deleteReq, err := c.getRequestsFromServer(context.Background(), userID) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + continue + } + newCache[userID] = deleteReq + } + + c.mu.Lock() + defer c.mu.Unlock() + c.cache = newCache +} + +func (c *deleteRequestsClient) currentUserIDs() []string { + c.mu.RLock() + defer c.mu.RUnlock() + + userIDs := make([]string, 0, len(c.cache)) + for userID := range c.cache { + userIDs = append(userIDs, userID) + } + + return userIDs +} + +func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID string) ([]DeleteRequest, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + req.Header.Set(orgHeaderKey, userID) + + resp, err := c.httpClient.Do(req) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + var deleteRequests []DeleteRequest + if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil { + level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) + return nil, err + } + + return deleteRequests, nil +} diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go new file mode 100644 index 0000000000000..f8f62ed964bea --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go @@ -0,0 +1,68 @@ +package deletion + +import ( + "context" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestGetCacheGenNumberForUser(t *testing.T) { + t.Run("it requests results from the api", func(t *testing.T) { + httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`} + client, err := NewDeleteRequestsClient("http://test-server", httpClient) + require.Nil(t, err) + + deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID") + require.Nil(t, err) + + require.Len(t, deleteRequests, 1) + require.Equal(t, "test-request", deleteRequests[0].RequestID) + + require.Equal(t, "http://test-server/loki/api/v1/delete", httpClient.req.URL.String()) + require.Equal(t, http.MethodGet, httpClient.req.Method) + require.Equal(t, "userID", httpClient.req.Header.Get("X-Scope-OrgID")) + }) + + t.Run("it caches the results", func(t *testing.T) { + httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`} + client, err := NewDeleteRequestsClient("http://test-server", httpClient, WithRequestClientCacheDuration(100*time.Millisecond)) + require.Nil(t, err) + + deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID") + require.Nil(t, err) + require.Equal(t, "test-request", deleteRequests[0].RequestID) + + httpClient.ret = `[{"request_id":"different"}]` + + deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID") + require.Nil(t, err) + require.Equal(t, "test-request", deleteRequests[0].RequestID) + + time.Sleep(200 * time.Millisecond) + + deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID") + require.Nil(t, err) + require.Equal(t, "different", deleteRequests[0].RequestID) + + client.Stop() + }) +} + +type mockHTTPClient struct { + ret string + req *http.Request +} + +func (c *mockHTTPClient) Do(req *http.Request) (*http.Response, error) { + c.req = req + + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(strings.NewReader(c.ret)), + }, nil +} diff --git a/production/ksonnet/loki/config.libsonnet b/production/ksonnet/loki/config.libsonnet index cb9b56bd3d265..e2f02285decd6 100644 --- a/production/ksonnet/loki/config.libsonnet +++ b/production/ksonnet/loki/config.libsonnet @@ -144,6 +144,9 @@ commonEnvs: [], loki: { + common: { + compactor_address: 'http://compactor.%s.svc.cluster.local.:%d' % [$._config.namespace, $._config.http_listen_port], + }, server: { graceful_shutdown_timeout: '5s', http_server_idle_timeout: '120s', @@ -158,7 +161,6 @@ frontend: { compress_responses: true, log_queries_longer_than: '5s', - compactor_address: 'http://compactor.%s.svc.cluster.local:%d' % [$._config.namespace, $._config.http_listen_port], }, frontend_worker: { match_max_concurrent: true,