From 51d1535c4d9adf786c76e59f1352dd1d1b6a28dd Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Wed, 18 May 2022 17:17:16 -0600 Subject: [PATCH 1/5] Fix delete updates --- docs/sources/configuration/_index.md | 8 +- pkg/loki/delete_store_listener.go | 10 +- pkg/loki/modules.go | 40 ++-- pkg/lokifrontend/config.go | 2 - .../stores/shipper/compactor/compactor.go | 2 + .../deletion/delete_requests_client.go | 174 ++++++++++++++++++ .../deletion/delete_requests_client_test.go | 68 +++++++ .../ksonnet/loki/boltdb_shipper.libsonnet | 1 + production/ksonnet/loki/config.libsonnet | 1 - 9 files changed, 278 insertions(+), 28 deletions(-) create mode 100644 pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go create mode 100644 pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 24d37c8631ca7..c26abbbd50f72 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -376,10 +376,6 @@ The `frontend` block configures the Loki query-frontend. # CLI flag: -frontend.downstream-url [downstream_url: | default = ""] -# Address, including port, where the compactor api is served -# CLI flag: -frontend.compactor-address -[compactor_address: | default = ""] - # Log queries that are slower than the specified duration. Set to 0 to disable. # Set to < 0 to enable on all queries. # CLI flag: -frontend.log-queries-longer-than @@ -2076,6 +2072,10 @@ compacts index shards to more performant forms. # CLI flag: -boltdb.shipper.compactor.deletion-mode [deletion_mode: | default = "whole-stream-deletion"] +# Address, including port, where the compactor api is served +# CLI flag: -boltdb.shipper.compactor.deletion-mode +[compactor_address: | default = ""] + # Maximum number of tables to compact in parallel. # While increasing this value, please make sure compactor has enough disk space # allocated to be able to store and compact as many tables. diff --git a/pkg/loki/delete_store_listener.go b/pkg/loki/delete_store_listener.go index 2a399603784c4..99e67c0893dfe 100644 --- a/pkg/loki/delete_store_listener.go +++ b/pkg/loki/delete_store_listener.go @@ -6,12 +6,12 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion" ) -func deleteRequestsStoreListener(d deletion.DeleteRequestsStore) *listener { +func deleteRequestsStoreListener(d deletion.DeleteRequestsClient) *listener { return &listener{d} } type listener struct { - deleteRequestsStore deletion.DeleteRequestsStore + deleteRequestsClient deletion.DeleteRequestsClient } // Starting is called when the service transitions from NEW to STARTING. @@ -26,7 +26,7 @@ func (l *listener) Stopping(from services.State) { // no need to do anything return } - l.deleteRequestsStore.Stop() + l.deleteRequestsClient.Stop() } // Terminated is called when the service transitions to the TERMINATED state. @@ -35,7 +35,7 @@ func (l *listener) Terminated(from services.State) { // no need to do anything return } - l.deleteRequestsStore.Stop() + l.deleteRequestsClient.Stop() } // Failed is called when the service transitions to the FAILED state. @@ -44,5 +44,5 @@ func (l *listener) Failed(from services.State, failure error) { // no need to do anything return } - l.deleteRequestsStore.Stop() + l.deleteRequestsClient.Stop() } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 62506a8e41fb4..460e9778762c7 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -234,7 +234,7 @@ func (t *Loki) initQuerier() (services.Service, error) { // Querier worker's max concurrent requests must be the same as the querier setting t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent - deleteStore, err := t.deleteRequestsStore() + deleteStore, err := t.deleteRequestsClient() if err != nil { return nil, err } @@ -568,17 +568,25 @@ func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) { return deletion.NewNoOpDeleteRequestsStore(), nil } - compactorAddress := t.Cfg.Frontend.CompactorAddress + compactorAddress, err := t.compactorAddress() + if err != nil { + return nil, err + } + + return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}) +} + +func (t *Loki) compactorAddress() (string, error) { if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) { // In single binary or read modes, this module depends on Server - compactorAddress = fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort) + return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil } - if compactorAddress == "" { - return nil, errors.New("query filtering for deletes requires 'compactor_address' to be configured") + if t.Cfg.CompactorConfig.Address == "" { + return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured") } - return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}) + return t.Cfg.CompactorConfig.Address, nil } func (t *Loki) initQueryFrontend() (_ services.Service, err error) { @@ -736,7 +744,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) { t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - deleteStore, err := t.deleteRequestsStore() + deleteStore, err := t.deleteRequestsClient() if err != nil { return nil, err } @@ -949,7 +957,7 @@ func (t *Loki) initUsageReport() (services.Service, error) { return ur, nil } -func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) { +func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) { // TODO(owen-d): enable delete request storage in tsdb if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) { return deletion.NewNoOpDeleteRequestsStore(), nil @@ -960,16 +968,16 @@ func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) { return nil, err } - deleteStore := deletion.NewNoOpDeleteRequestsStore() - if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled { - indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer) - if err != nil { - return nil, err - } + if !filteringEnabled { + return deletion.NewNoOpDeleteRequestsStore(), nil + } - deleteStore = deletion.NewDeleteStoreFromIndexClient(indexClient) + compactorAddress, err := t.compactorAddress() + if err != nil { + return nil, err } - return deleteStore, nil + + return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}) } func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) { diff --git a/pkg/lokifrontend/config.go b/pkg/lokifrontend/config.go index b4d18c84cb681..f7cb5475f3bc6 100644 --- a/pkg/lokifrontend/config.go +++ b/pkg/lokifrontend/config.go @@ -15,7 +15,6 @@ type Config struct { CompressResponses bool `yaml:"compress_responses"` DownstreamURL string `yaml:"downstream_url"` - CompactorAddress string `yaml:"compactor_address"` TailProxyURL string `yaml:"tail_proxy_url"` } @@ -28,6 +27,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.") f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.") - f.StringVar(&cfg.CompactorAddress, "frontend.compactor-address", "", "host and port where the compactor API is listening") f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.") } diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index a6e250bae7174..e5943f2429d80 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -72,6 +72,7 @@ type Config struct { DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"` + Address string `yaml:"compactor_address,omitempty"` } // RegisterFlags registers flags. @@ -87,6 +88,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|"))) + f.StringVar(&cfg.Address, "boltdb.shipper.compactor.address", "", "host and port where the compactor API is listening") cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f) } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go new file mode 100644 index 0000000000000..99cb93008e847 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go @@ -0,0 +1,174 @@ +package deletion + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "sync" + "time" + + "github.com/go-kit/log/level" + + "github.com/grafana/loki/pkg/util/log" +) + +const ( + orgHeaderKey = "X-Scope-OrgID" + getDeletePath = "/loki/api/v1/delete" +) + +type DeleteRequestsClient interface { + GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) + Stop() +} + +type deleteRequestsClient struct { + url string + httpClient doer + mu sync.RWMutex + + cache map[string][]DeleteRequest + cacheDuration time.Duration + + stopChan chan struct{} +} + +type doer interface { + Do(*http.Request) (*http.Response, error) +} + +type DeleteRequestsStoreOption func(c *deleteRequestsClient) + +func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { + return func(c *deleteRequestsClient) { + c.cacheDuration = d + } +} + +func NewDeleteRequestsClient(addr string, c doer, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { + u, err := url.Parse(addr) + if err != nil { + level.Error(log.Logger).Log("msg", "error parsing url", "err", err) + return nil, err + } + u.Path = getDeletePath + + client := &deleteRequestsClient{ + url: u.String(), + httpClient: c, + cacheDuration: 5 * time.Minute, + cache: make(map[string][]DeleteRequest), + stopChan: make(chan struct{}), + } + + for _, o := range opts { + o(client) + } + + go client.updateLoop() + return client, nil +} + +func (c *deleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) { + if cachedRequests, ok := c.getCachedRequests(userID); ok { + return cachedRequests, nil + } + + requests, err := c.getRequestsFromServer(ctx, userID) + if err != nil { + return nil, err + } + + c.mu.Lock() + defer c.mu.Unlock() + c.cache[userID] = requests + + return requests, nil +} + +func (c *deleteRequestsClient) getCachedRequests(userID string) ([]DeleteRequest, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + res, ok := c.cache[userID] + return res, ok +} + +func (c *deleteRequestsClient) Stop() { + close(c.stopChan) +} + +func (c *deleteRequestsClient) updateLoop() { + t := time.NewTicker(c.cacheDuration) + for { + select { + case <-t.C: + c.updateCache() + case <-c.stopChan: + return + } + } +} + +func (c *deleteRequestsClient) updateCache() { + userIDs := c.currentUserIDs() + + newCache := make(map[string][]DeleteRequest) + for _, userID := range userIDs { + deleteReq, err := c.getRequestsFromServer(context.Background(), userID) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + continue + } + newCache[userID] = deleteReq + } + + c.mu.Lock() + defer c.mu.Unlock() + c.cache = newCache +} + +func (c *deleteRequestsClient) currentUserIDs() []string { + c.mu.RLock() + defer c.mu.RUnlock() + + userIDs := make([]string, 0, len(c.cache)) + for userID := range c.cache { + userIDs = append(userIDs, userID) + } + + return userIDs +} + +func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID string) ([]DeleteRequest, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + req.Header.Set(orgHeaderKey, userID) + + resp, err := c.httpClient.Do(req) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + var deleteRequests []DeleteRequest + if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil { + level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) + return nil, err + } + + return deleteRequests, nil +} diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go new file mode 100644 index 0000000000000..f8f62ed964bea --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go @@ -0,0 +1,68 @@ +package deletion + +import ( + "context" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestGetCacheGenNumberForUser(t *testing.T) { + t.Run("it requests results from the api", func(t *testing.T) { + httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`} + client, err := NewDeleteRequestsClient("http://test-server", httpClient) + require.Nil(t, err) + + deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID") + require.Nil(t, err) + + require.Len(t, deleteRequests, 1) + require.Equal(t, "test-request", deleteRequests[0].RequestID) + + require.Equal(t, "http://test-server/loki/api/v1/delete", httpClient.req.URL.String()) + require.Equal(t, http.MethodGet, httpClient.req.Method) + require.Equal(t, "userID", httpClient.req.Header.Get("X-Scope-OrgID")) + }) + + t.Run("it caches the results", func(t *testing.T) { + httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`} + client, err := NewDeleteRequestsClient("http://test-server", httpClient, WithRequestClientCacheDuration(100*time.Millisecond)) + require.Nil(t, err) + + deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID") + require.Nil(t, err) + require.Equal(t, "test-request", deleteRequests[0].RequestID) + + httpClient.ret = `[{"request_id":"different"}]` + + deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID") + require.Nil(t, err) + require.Equal(t, "test-request", deleteRequests[0].RequestID) + + time.Sleep(200 * time.Millisecond) + + deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID") + require.Nil(t, err) + require.Equal(t, "different", deleteRequests[0].RequestID) + + client.Stop() + }) +} + +type mockHTTPClient struct { + ret string + req *http.Request +} + +func (c *mockHTTPClient) Do(req *http.Request) (*http.Response, error) { + c.req = req + + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(strings.NewReader(c.ret)), + }, nil +} diff --git a/production/ksonnet/loki/boltdb_shipper.libsonnet b/production/ksonnet/loki/boltdb_shipper.libsonnet index 31f67f748a2bc..31235ed463c07 100644 --- a/production/ksonnet/loki/boltdb_shipper.libsonnet +++ b/production/ksonnet/loki/boltdb_shipper.libsonnet @@ -30,6 +30,7 @@ compactor+: { working_directory: '/data/compactor', shared_store: $._config.boltdb_shipper_shared_store, + compactor_address: 'http://compactor.%s.svc.cluster.local:%d' % [$._config.namespace, $._config.http_listen_port], }, } else {}, }, diff --git a/production/ksonnet/loki/config.libsonnet b/production/ksonnet/loki/config.libsonnet index cb9b56bd3d265..ad2a5fee3febf 100644 --- a/production/ksonnet/loki/config.libsonnet +++ b/production/ksonnet/loki/config.libsonnet @@ -158,7 +158,6 @@ frontend: { compress_responses: true, log_queries_longer_than: '5s', - compactor_address: 'http://compactor.%s.svc.cluster.local:%d' % [$._config.namespace, $._config.http_listen_port], }, frontend_worker: { match_max_concurrent: true, From 40e9df36cf6d8474c3964ee6722efac812566b74 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Mon, 23 May 2022 08:37:33 -0600 Subject: [PATCH 2/5] Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> --- .../stores/shipper/compactor/deletion/delete_requests_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go index 99cb93008e847..61c0d80d135ba 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go @@ -158,7 +158,7 @@ func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { + if resp.StatusCode/100 != 2 { err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) return nil, err From 8b73fbd123c43a142261527b492443ed2a0bea5e Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Wed, 25 May 2022 11:26:35 -0600 Subject: [PATCH 3/5] Add compactor address to the storage config --- pkg/loki/modules.go | 4 ++-- pkg/storage/factory.go | 3 +++ pkg/storage/stores/shipper/compactor/compactor.go | 2 -- production/ksonnet/loki/boltdb_shipper.libsonnet | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 9cad1fb6c16f2..dc25069443037 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -589,11 +589,11 @@ func (t *Loki) compactorAddress() (string, error) { return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil } - if t.Cfg.CompactorConfig.Address == "" { + if t.Cfg.StorageConfig.CompactorAddress == "" { return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured") } - return t.Cfg.CompactorConfig.Address, nil + return t.Cfg.StorageConfig.CompactorAddress, nil } func (t *Loki) initQueryFrontend() (_ services.Service, err error) { diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 22c4c57bfe27f..d4ae54cc5e9e6 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -69,6 +69,8 @@ type Config struct { BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper"` TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper"` + CompactorAddress string `yaml:"compactor_address"` + // Config for using AsyncStore when using async index stores like `boltdb-shipper`. // It is required for getting chunk ids of recently flushed chunks from the ingesters. EnableAsyncStore bool `yaml:"-"` @@ -95,6 +97,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxParallelGetChunk, "store.max-parallel-get-chunk", 150, "Maximum number of parallel chunk reads.") cfg.BoltDBShipperConfig.RegisterFlags(f) f.IntVar(&cfg.MaxChunkBatchSize, "store.max-chunk-batch-size", 50, "The maximum number of chunks to fetch per batch.") + f.StringVar(&cfg.CompactorAddress, "store.compactor-address", "", "address of the compactor in the form 'http://host:port'") cfg.TSDBShipperConfig.RegisterFlagsWithPrefix("tsdb.", f) } diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index e5943f2429d80..a6e250bae7174 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -72,7 +72,6 @@ type Config struct { DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"` - Address string `yaml:"compactor_address,omitempty"` } // RegisterFlags registers flags. @@ -88,7 +87,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|"))) - f.StringVar(&cfg.Address, "boltdb.shipper.compactor.address", "", "host and port where the compactor API is listening") cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f) } diff --git a/production/ksonnet/loki/boltdb_shipper.libsonnet b/production/ksonnet/loki/boltdb_shipper.libsonnet index 31235ed463c07..2a6cdfd71f20b 100644 --- a/production/ksonnet/loki/boltdb_shipper.libsonnet +++ b/production/ksonnet/loki/boltdb_shipper.libsonnet @@ -26,11 +26,11 @@ active_index_directory: '/data/index', cache_location: '/data/boltdb-cache', }, + compactor_address: 'http://compactor.%s.svc.cluster.local:%d' % [$._config.namespace, $._config.http_listen_port], }, compactor+: { working_directory: '/data/compactor', shared_store: $._config.boltdb_shipper_shared_store, - compactor_address: 'http://compactor.%s.svc.cluster.local:%d' % [$._config.namespace, $._config.http_listen_port], }, } else {}, }, From c6c27d2c94640c8148c1576cc4099a7b0e61e624 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Wed, 25 May 2022 14:21:08 -0600 Subject: [PATCH 4/5] review comments --- docs/sources/configuration/_index.md | 8 ++++---- pkg/loki/modules.go | 2 +- .../shipper/compactor/deletion/delete_requests_client.go | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index bf54327dfaee4..16aa2893f43db 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2079,10 +2079,6 @@ compacts index shards to more performant forms. # CLI flag: -boltdb.shipper.compactor.deletion-mode [deletion_mode: | default = "whole-stream-deletion"] -# Address, including port, where the compactor api is served -# CLI flag: -boltdb.shipper.compactor.deletion-mode -[compactor_address: | default = ""] - # Maximum number of tables to compact in parallel. # While increasing this value, please make sure compactor has enough disk space # allocated to be able to store and compact as many tables. @@ -2208,6 +2204,10 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -store.max-query-length [max_query_length: | default = 721h] +# Address, including port, where the compactor api is served +# CLI flag: -store.compactor-address +[compactor_address: | default = ""] + # Maximum number of queries that will be scheduled in parallel by the frontend. # CLI flag: -querier.max-query-parallelism [max_query_parallelism: | default = 32] diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index dc25069443037..ecbbeb6ec6189 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -968,7 +968,7 @@ func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) { return nil, err } - if !filteringEnabled { + if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !filteringEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go index 61c0d80d135ba..bde272b520acc 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go @@ -26,7 +26,7 @@ type DeleteRequestsClient interface { type deleteRequestsClient struct { url string - httpClient doer + httpClient httpClient mu sync.RWMutex cache map[string][]DeleteRequest @@ -35,7 +35,7 @@ type deleteRequestsClient struct { stopChan chan struct{} } -type doer interface { +type httpClient interface { Do(*http.Request) (*http.Response, error) } @@ -47,7 +47,7 @@ func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { } } -func NewDeleteRequestsClient(addr string, c doer, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { +func NewDeleteRequestsClient(addr string, c httpClient, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { u, err := url.Parse(addr) if err != nil { level.Error(log.Logger).Log("msg", "error parsing url", "err", err) From 41cffbcf5223f05603894eb805be8e82e2f3d152 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Thu, 26 May 2022 12:10:39 -0600 Subject: [PATCH 5/5] Review feedback --- docs/sources/configuration/_index.md | 8 ++++---- pkg/loki/common/common.go | 7 ++++++- pkg/loki/modules.go | 4 ++-- pkg/storage/factory.go | 3 --- production/ksonnet/loki/boltdb_shipper.libsonnet | 1 - production/ksonnet/loki/config.libsonnet | 3 +++ 6 files changed, 15 insertions(+), 11 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 16aa2893f43db..4bdf2a5b063c6 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2204,10 +2204,6 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -store.max-query-length [max_query_length: | default = 721h] -# Address, including port, where the compactor api is served -# CLI flag: -store.compactor-address -[compactor_address: | default = ""] - # Maximum number of queries that will be scheduled in parallel by the frontend. # CLI flag: -querier.max-query-parallelism [max_query_parallelism: | default = 32] @@ -2600,6 +2596,10 @@ This way, one doesn't have to replicate configuration in multiple places. # to be used by the distributor's ring, but only if the distributor's ring itself # doesn't have a `heartbeat_period` set. [ring: ] + +# Address, including port, where the compactor api is served +# CLI flag: -common.compactor-address +[compactor_address: | default = ""] ``` ## analytics diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index 6c296027419cc..721ffed9d12e3 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -40,9 +40,12 @@ type Config struct { // You can check this during Loki execution under ring status pages (ex: `/ring` will output the address of the different ingester // instances). InstanceAddr string `yaml:"instance_addr"` + + // CompactorAddress is the http address of the compactor in the form http://host:port + CompactorAddress string `yaml:"compactor_address"` } -func (c *Config) RegisterFlags(_ *flag.FlagSet) { +func (c *Config) RegisterFlags(f *flag.FlagSet) { throwaway := flag.NewFlagSet("throwaway", flag.PanicOnError) throwaway.IntVar(&c.ReplicationFactor, "common.replication-factor", 3, "How many ingesters incoming data should be replicated to.") c.Storage.RegisterFlagsWithPrefix("common.storage", throwaway) @@ -52,6 +55,8 @@ func (c *Config) RegisterFlags(_ *flag.FlagSet) { c.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger) throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.") throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.") + + f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port") } type Storage struct { diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index ecbbeb6ec6189..1d6e28315000f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -589,11 +589,11 @@ func (t *Loki) compactorAddress() (string, error) { return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil } - if t.Cfg.StorageConfig.CompactorAddress == "" { + if t.Cfg.Common.CompactorAddress == "" { return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured") } - return t.Cfg.StorageConfig.CompactorAddress, nil + return t.Cfg.Common.CompactorAddress, nil } func (t *Loki) initQueryFrontend() (_ services.Service, err error) { diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index d4ae54cc5e9e6..22c4c57bfe27f 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -69,8 +69,6 @@ type Config struct { BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper"` TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper"` - CompactorAddress string `yaml:"compactor_address"` - // Config for using AsyncStore when using async index stores like `boltdb-shipper`. // It is required for getting chunk ids of recently flushed chunks from the ingesters. EnableAsyncStore bool `yaml:"-"` @@ -97,7 +95,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxParallelGetChunk, "store.max-parallel-get-chunk", 150, "Maximum number of parallel chunk reads.") cfg.BoltDBShipperConfig.RegisterFlags(f) f.IntVar(&cfg.MaxChunkBatchSize, "store.max-chunk-batch-size", 50, "The maximum number of chunks to fetch per batch.") - f.StringVar(&cfg.CompactorAddress, "store.compactor-address", "", "address of the compactor in the form 'http://host:port'") cfg.TSDBShipperConfig.RegisterFlagsWithPrefix("tsdb.", f) } diff --git a/production/ksonnet/loki/boltdb_shipper.libsonnet b/production/ksonnet/loki/boltdb_shipper.libsonnet index 2a6cdfd71f20b..31f67f748a2bc 100644 --- a/production/ksonnet/loki/boltdb_shipper.libsonnet +++ b/production/ksonnet/loki/boltdb_shipper.libsonnet @@ -26,7 +26,6 @@ active_index_directory: '/data/index', cache_location: '/data/boltdb-cache', }, - compactor_address: 'http://compactor.%s.svc.cluster.local:%d' % [$._config.namespace, $._config.http_listen_port], }, compactor+: { working_directory: '/data/compactor', diff --git a/production/ksonnet/loki/config.libsonnet b/production/ksonnet/loki/config.libsonnet index ad2a5fee3febf..e2f02285decd6 100644 --- a/production/ksonnet/loki/config.libsonnet +++ b/production/ksonnet/loki/config.libsonnet @@ -144,6 +144,9 @@ commonEnvs: [], loki: { + common: { + compactor_address: 'http://compactor.%s.svc.cluster.local.:%d' % [$._config.namespace, $._config.http_listen_port], + }, server: { graceful_shutdown_timeout: '5s', http_server_idle_timeout: '120s',