Skip to content

Commit

Permalink
Remove whole stream deletion mode (#6435)
Browse files Browse the repository at this point in the history
* Remove whole-stream-deletion mode

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove whole-stream-deletion from docs

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update the changelog

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Sort changelog entries

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove link to wrong configuration

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix integration test

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Set default deletion mode to disabled

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove extra white line in documentation

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix default value in docs

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix changelog

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add DeletionEnabled method on mode

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rename test

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
  • Loading branch information
MichelHollands authored Jun 22, 2022
1 parent 798677a commit f80e487
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Main

* [6444](https://github.com/grafana/loki/pull/6444) **aminesnow** Add TLS config to query frontend.
* [6435](https://github.com/grafana/loki/pull/6430) **MichelHollands**: Remove the `whole-stream-deletion` mode.
* [6415](https://github.com/grafana/loki/pull/6415) **salvacorts** Evenly spread queriers across kubernetes nodes.
* [6410](https://github.com/grafana/loki/pull/6410) **MichelHollands**: Add support for per tenant delete API access enabling.
* [6372](https://github.com/grafana/loki/pull/6372) **splitice**: Add support for numbers in JSON fields.
Expand Down
11 changes: 3 additions & 8 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2109,23 +2109,18 @@ compacts index shards to more performant forms.
# CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period
[delete_request_cancel_period: <duration> | default = 24h]
# Which deletion mode to use. Supported values are: disabled,
# whole-stream-deletion, filter-only, and filter-and-delete.
# CLI flag: -boltdb.shipper.compactor.deletion-mode
[deletion_mode: <string> | default = "whole-stream-deletion"]
# Maximum number of tables to compact in parallel.
# While increasing this value, please make sure compactor has enough disk space
# allocated to be able to store and compact as many tables.
# CLI flag: -boltdb.shipper.compactor.max-compaction-parallelism
[max_compaction_parallelism: <int> | default = 1]
# Deletion mode.
# Can be one of "disabled", "whole-stream-deletion", "filter-only", or "filter-and-delete".
# When set to the default value of "whole-stream-deletion", and if
# Can be one of "disabled", "filter-only", or "filter-and-delete".
# When set to "filter-only" or "filter-and-delete", and if
# retention_enabled is true, then the log entry deletion API endpoints are available.
# CLI flag: -boltdb.shipper.compactor.deletion-mode
[deletion_mode: <string> | default = "whole-stream-deletion"]
[deletion_mode: <string> | default = "disabled"]
# The hash ring configuration used by compactors to elect a single instance for running compactions
# The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring
Expand Down
4 changes: 1 addition & 3 deletions docs/sources/operations/storage/logs-deletion.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ Log entry deletion is supported _only_ for the BoltDB Shipper index store.
Grafana Loki supports the deletion of log entries from a specified stream.
Log entries that fall within a specified time window and match an optional line filter are those that will be deleted.


The Compactor component exposes REST endpoints that process delete requests.
Hitting the endpoint specifies the streams and the time window.
The deletion of the log entries takes place after a configurable cancellation time period expires.
Expand All @@ -18,9 +17,8 @@ Log entry deletion relies on configuration of the custom logs retention workflow

## Configuration

Enable log entry deletion by setting `retention_enabled` to true and `deletion_mode` to `whole-stream-deletion`, `filter-only`, or `filter-and-delete` in the compactor's configuration. See the example in [Retention configuration](../retention#retention-configuration).
Enable log entry deletion by setting `retention_enabled` to true and `deletion_mode` to `filter-only` or `filter-and-delete` in the compactor's configuration.

With `whole-stream-deletion`, all the log entries matching the query given in the delete request are removed.
With `filter-only`, log lines matching the query in the delete request are filtered out when querying Loki. They are not removed from the on-disk chunks.
With `filter-and-delete`, log lines matching the query in the delete request are filtered out when querying Loki, and they are also removed from the on-disk chunks.

Expand Down
2 changes: 2 additions & 0 deletions integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL().Host,
"-frontend.default-validity=0s",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host,
"-common.compactor-address="+tCompactor.HTTPURL().String(),
)
_ = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL().Host,
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host,
"-common.compactor-address="+tCompactor.HTTPURL().String(),
)
)

Expand Down
21 changes: 8 additions & 13 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
}

func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) {
filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode)
filteringEnabled, err := deletion.DeleteEnabled(t.Cfg.CompactorConfig.DeletionMode)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -880,16 +880,11 @@ func (t *Loki) initCompactor() (services.Service, error) {

t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor)

if t.Cfg.CompactorConfig.RetentionEnabled {
switch t.compactor.DeleteMode() {
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler()))
default:
break
}
if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode().DeleteEnabled() {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler()))
}

return t.compactor, nil
Expand Down Expand Up @@ -993,12 +988,12 @@ func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode)
deleteEnabled, err := deletion.DeleteEnabled(t.Cfg.CompactorConfig.DeletionMode)
if err != nil {
return nil, err
}

if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !filteringEnabled {
if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !deleteEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|")))
f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "disabled", fmt.Sprintf("Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|")))
cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f)
f.BoolVar(&cfg.RunOnce, "boltdb.shipper.compactor.run-once", false, "Run the compactor one time to cleanup and compact index files only (no retention applied)")
}
Expand Down Expand Up @@ -231,12 +231,11 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem
return err
}

switch c.deleteMode {
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
if c.deleteMode.DeleteEnabled() {
if err := c.initDeletes(r, limits); err != nil {
return err
}
default:
} else {
c.expirationChecker = newExpirationChecker(
retention.NewExpirationChecker(limits),
// This is a dummy deletion ExpirationChecker that never expires anything
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}{
{
name: "no delete requests",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
expectedResp: resp{
isExpired: false,
nonDeletedIntervals: nil,
},
},
{
name: "no relevant delete requests",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: "different-user",
Expand All @@ -65,7 +65,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "whole chunk deleted by single request",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand All @@ -81,7 +81,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "deleted interval out of range",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand All @@ -97,7 +97,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "multiple delete requests with one deleting the whole chunk",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand All @@ -119,7 +119,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "multiple delete requests causing multiple holes",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "multiple overlapping requests deleting the whole chunk",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand All @@ -194,7 +194,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "multiple non-overlapping requests deleting the whole chunk",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand Down
17 changes: 8 additions & 9 deletions pkg/storage/stores/shipper/compactor/deletion/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ var (
)

const (
Disabled Mode = iota
WholeStreamDeletion // The existing log deletion that removes whole streams.
Disabled Mode = iota
FilterOnly
FilterAndDelete
)
Expand All @@ -21,8 +20,6 @@ func (m Mode) String() string {
switch m {
case Disabled:
return "disabled"
case WholeStreamDeletion:
return "whole-stream-deletion"
case FilterOnly:
return "filter-only"
case FilterAndDelete:
Expand All @@ -31,16 +28,18 @@ func (m Mode) String() string {
return "unknown"
}

func (m Mode) DeleteEnabled() bool {
return m == FilterOnly || m == FilterAndDelete
}

func AllModes() []string {
return []string{Disabled.String(), WholeStreamDeletion.String(), FilterOnly.String(), FilterAndDelete.String()}
return []string{Disabled.String(), FilterOnly.String(), FilterAndDelete.String()}
}

func ParseMode(in string) (Mode, error) {
switch in {
case "disabled":
return Disabled, nil
case "whole-stream-deletion":
return WholeStreamDeletion, nil
case "filter-only":
return FilterOnly, nil
case "filter-and-delete":
Expand All @@ -49,11 +48,11 @@ func ParseMode(in string) (Mode, error) {
return 0, errUnknownMode
}

func FilteringEnabled(in string) (bool, error) {
func DeleteEnabled(in string) (bool, error) {
deleteMode, err := ParseMode(in)
if err != nil {
return false, err
}

return deleteMode == FilterOnly || deleteMode == FilterAndDelete, nil
return deleteMode.DeleteEnabled(), nil
}
24 changes: 19 additions & 5 deletions pkg/storage/stores/shipper/compactor/deletion/mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@ import (

func TestAllModes(t *testing.T) {
modes := AllModes()
require.ElementsMatch(t, []string{"disabled", "whole-stream-deletion", "filter-only", "filter-and-delete"}, modes)
require.ElementsMatch(t, []string{"disabled", "filter-only", "filter-and-delete"}, modes)
}

func TestParseMode(t *testing.T) {
mode, err := ParseMode("disabled")
require.NoError(t, err)
require.Equal(t, Disabled, mode)

mode, err = ParseMode("whole-stream-deletion")
require.NoError(t, err)
require.Equal(t, WholeStreamDeletion, mode)

mode, err = ParseMode("filter-only")
require.NoError(t, err)
require.Equal(t, FilterOnly, mode)
Expand All @@ -31,3 +27,21 @@ func TestParseMode(t *testing.T) {
_, err = ParseMode("something-else")
require.ErrorIs(t, errUnknownMode, err)
}

func TestDeleteEnabled(t *testing.T) {
enabled, err := DeleteEnabled("disabled")
require.NoError(t, err)
require.False(t, enabled)

enabled, err = DeleteEnabled("filter-only")
require.NoError(t, err)
require.True(t, enabled)

enabled, err = DeleteEnabled("filter-and-delete")
require.NoError(t, err)
require.True(t, enabled)

enabled, err = DeleteEnabled("some other value")
require.Error(t, err)
require.False(t, enabled)
}

0 comments on commit f80e487

Please sign in to comment.