Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix delete updates #6194

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,6 @@ The `frontend` block configures the Loki query-frontend.
# CLI flag: -frontend.downstream-url
[downstream_url: <string> | default = ""]

# Address, including port, where the compactor api is served
# CLI flag: -frontend.compactor-address
[compactor_address: <string> | default = ""]

# Log queries that are slower than the specified duration. Set to 0 to disable.
# Set to < 0 to enable on all queries.
# CLI flag: -frontend.log-queries-longer-than
Expand Down Expand Up @@ -2083,6 +2079,10 @@ compacts index shards to more performant forms.
# CLI flag: -boltdb.shipper.compactor.deletion-mode
[deletion_mode: <string> | default = "whole-stream-deletion"]

# Address, including port, where the compactor api is served
# CLI flag: -boltdb.shipper.compactor.deletion-mode
[compactor_address: <string> | default = ""]

# Maximum number of tables to compact in parallel.
# While increasing this value, please make sure compactor has enough disk space
# allocated to be able to store and compact as many tables.
Expand Down
10 changes: 5 additions & 5 deletions pkg/loki/delete_store_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
)

func deleteRequestsStoreListener(d deletion.DeleteRequestsStore) *listener {
func deleteRequestsStoreListener(d deletion.DeleteRequestsClient) *listener {
return &listener{d}
}

type listener struct {
deleteRequestsStore deletion.DeleteRequestsStore
deleteRequestsClient deletion.DeleteRequestsClient
}

// Starting is called when the service transitions from NEW to STARTING.
Expand All @@ -26,7 +26,7 @@ func (l *listener) Stopping(from services.State) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}

// Terminated is called when the service transitions to the TERMINATED state.
Expand All @@ -35,7 +35,7 @@ func (l *listener) Terminated(from services.State) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}

// Failed is called when the service transitions to the FAILED state.
Expand All @@ -44,5 +44,5 @@ func (l *listener) Failed(from services.State, failure error) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}
40 changes: 24 additions & 16 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent

deleteStore, err := t.deleteRequestsStore()
deleteStore, err := t.deleteRequestsClient()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -569,17 +569,25 @@ func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

compactorAddress := t.Cfg.Frontend.CompactorAddress
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}

func (t *Loki) compactorAddress() (string, error) {
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
// In single binary or read modes, this module depends on Server
compactorAddress = fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort)
return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil
}

if compactorAddress == "" {
return nil, errors.New("query filtering for deletes requires 'compactor_address' to be configured")
if t.Cfg.CompactorConfig.Address == "" {
return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured")
}

return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
return t.Cfg.CompactorConfig.Address, nil
}

func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
Expand Down Expand Up @@ -737,7 +745,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV

deleteStore, err := t.deleteRequestsStore()
deleteStore, err := t.deleteRequestsClient()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -940,7 +948,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}

func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
Expand All @@ -951,16 +959,16 @@ func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
return nil, err
}

deleteStore := deletion.NewNoOpDeleteRequestsStore()
if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled {
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, nil, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
if !filteringEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

deleteStore = deletion.NewDeleteStoreFromIndexClient(indexClient)
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}
return deleteStore, nil

return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}

func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
Expand Down
2 changes: 0 additions & 2 deletions pkg/lokifrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type Config struct {

CompressResponses bool `yaml:"compress_responses"`
DownstreamURL string `yaml:"downstream_url"`
CompactorAddress string `yaml:"compactor_address"`

TailProxyURL string `yaml:"tail_proxy_url"`
}
Expand All @@ -28,6 +27,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
f.StringVar(&cfg.CompactorAddress, "frontend.compactor-address", "", "host and port where the compactor API is listening")
f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.")
}
2 changes: 2 additions & 0 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Config struct {
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"`
Address string `yaml:"compactor_address,omitempty"`
}

// RegisterFlags registers flags.
Expand All @@ -87,6 +88,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|")))
f.StringVar(&cfg.Address, "boltdb.shipper.compactor.address", "", "host and port where the compactor API is listening")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this config to the compactor looks out of place to me. New users already get confused by our configs, so it is better to duplicate it a little than add to the confusion. It could be just me. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth on this. If we leave it off the compactor config, it becomes a required property on:

  • frontend
  • querier
  • ruler

Do you think that's clearer for the user than having the compactor know it's own address?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making it part of storage config? It is anyways something related to storage.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there also a case to be made for the common config, since it's common across multiple components?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Common Config says // Values defined under this common configuration are supersede if a more specific value is defined.

Does that imply that there's also some more specific override elsewhere? If not, it would be a fine place for this to go.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would apply in this case, if we moved it to the common config so that was the only place it's defined, then there would be no superseding. What that comment is meant to convey is that after things from the common config (like storage or ring configs) are applied to the many places those are used, the config.yaml is read again, so if there is say, a specific ring config for the scheduler, that will supersede whatever was put in common.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I thought about using common config but due to that comment I thought it was not the right place, same as Travis. If we can use it for this use case then it would be a better option.

cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package deletion

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sync"
"time"

"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/util/log"
)

const (
orgHeaderKey = "X-Scope-OrgID"
getDeletePath = "/loki/api/v1/delete"
)

type DeleteRequestsClient interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
Stop()
}

type deleteRequestsClient struct {
url string
httpClient doer
mu sync.RWMutex

cache map[string][]DeleteRequest
cacheDuration time.Duration

stopChan chan struct{}
}

type doer interface {
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
Do(*http.Request) (*http.Response, error)
}

type DeleteRequestsStoreOption func(c *deleteRequestsClient)

func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption {
return func(c *deleteRequestsClient) {
c.cacheDuration = d
}
}

func NewDeleteRequestsClient(addr string, c doer, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of taking an HTTP client as a parameter, what do you think about accepting a config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to take the client, if it's not too disruptive. I really like this pattern because it makes if very easy to unit test without worrying about the network: pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go

u, err := url.Parse(addr)
if err != nil {
level.Error(log.Logger).Log("msg", "error parsing url", "err", err)
return nil, err
}
u.Path = getDeletePath

client := &deleteRequestsClient{
url: u.String(),
httpClient: c,
cacheDuration: 5 * time.Minute,
cache: make(map[string][]DeleteRequest),
stopChan: make(chan struct{}),
}

for _, o := range opts {
o(client)
}

go client.updateLoop()
return client, nil
}

func (c *deleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
if cachedRequests, ok := c.getCachedRequests(userID); ok {
return cachedRequests, nil
}

requests, err := c.getRequestsFromServer(ctx, userID)
if err != nil {
return nil, err
}

c.mu.Lock()
defer c.mu.Unlock()
c.cache[userID] = requests

return requests, nil
}

func (c *deleteRequestsClient) getCachedRequests(userID string) ([]DeleteRequest, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

res, ok := c.cache[userID]
return res, ok
}

func (c *deleteRequestsClient) Stop() {
close(c.stopChan)
}

func (c *deleteRequestsClient) updateLoop() {
t := time.NewTicker(c.cacheDuration)
for {
select {
case <-t.C:
c.updateCache()
case <-c.stopChan:
return
}
}
}

func (c *deleteRequestsClient) updateCache() {
userIDs := c.currentUserIDs()

newCache := make(map[string][]DeleteRequest)
for _, userID := range userIDs {
deleteReq, err := c.getRequestsFromServer(context.Background(), userID)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
continue
}
newCache[userID] = deleteReq
}

c.mu.Lock()
defer c.mu.Unlock()
c.cache = newCache
}

func (c *deleteRequestsClient) currentUserIDs() []string {
c.mu.RLock()
defer c.mu.RUnlock()

userIDs := make([]string, 0, len(c.cache))
for userID := range c.cache {
userIDs = append(userIDs, userID)
}

return userIDs
}

func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID string) ([]DeleteRequest, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}

req.Header.Set(orgHeaderKey, userID)

resp, err := c.httpClient.Do(req)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}

var deleteRequests []DeleteRequest
if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil {
level.Error(log.Logger).Log("msg", "error marshalling response", "err", err)
return nil, err
}

return deleteRequests, nil
}
Loading