Skip to content

Commit

Permalink
feat: add support for using sqlite for storing delete requests (#16437)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored Feb 28, 2025
1 parent 7f0dc01 commit 5b33e65
Show file tree
Hide file tree
Showing 1,344 changed files with 5,529,101 additions and 766 deletions.
10 changes: 10 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1837,6 +1837,16 @@ retention_backoff_config:
# CLI flag: -compactor.delete-request-store.key-prefix
[delete_request_store_key_prefix: <string> | default = "index/"]
# Type of DB to use for storing delete requests. Supported types: boltdb, sqlite
# CLI flag: -compactor.delete-request-store.db-type
[delete_request_store_db_type: <string> | default = "boltdb"]
# Type of DB to use as backup for storing delete requests. Backup DB should
# ideally be used while migrating from one DB type to another. Supported
# type(s): boltdb
# CLI flag: -compactor.delete-request-store.backup-db-type
[backup_delete_request_store_db_type: <string> | default = ""]
# The max number of delete requests to run per compaction cycle.
# CLI flag: -compactor.delete-batch-size
[delete_batch_size: <int> | default = 70]
Expand Down
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ require (
gotest.tools v2.2.0+incompatible
k8s.io/apimachinery v0.32.2
k8s.io/utils v0.0.0-20241210054802-24370beab758
zombiezen.com/go/sqlite v1.4.0
)

require (
Expand Down Expand Up @@ -187,6 +188,7 @@ require (
github.com/minio/crc64nvme v1.0.1 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/userns v0.1.0 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.116.0 // indirect
Expand All @@ -196,6 +198,7 @@ require (
github.com/pkg/xattr v0.4.10 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
Expand All @@ -214,6 +217,10 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.33.0 // indirect
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
modernc.org/libc v1.55.3 // indirect
modernc.org/mathutil v1.6.0 // indirect
modernc.org/memory v1.8.0 // indirect
modernc.org/sqlite v1.34.1 // indirect
)

require (
Expand Down
30 changes: 30 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,8 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks=
github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/ncw/swift/v2 v2.0.3 h1:8R9dmgFIWs+RiVlisCEfiQiik1hjuR0JnOkLxaP9ihg=
Expand Down Expand Up @@ -1068,6 +1070,8 @@ github.com/redis/go-redis/v9 v9.7.1 h1:4LhKRCIduqXqtvCUlaq9c8bdHOkICjDMrr1+Zb3os
github.com/redis/go-redis/v9 v9.7.1/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo=
github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/richardartoul/molecule v1.0.0 h1:+LFA9cT7fn8KF39zy4dhOnwcOwRoqKiBkPqKqya+8+U=
github.com/richardartoul/molecule v1.0.0/go.mod h1:uvX/8buq8uVeiZiFht+0lqSLBHF+uGV8BrTv8W/SIwk=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
Expand Down Expand Up @@ -1773,6 +1777,30 @@ k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJ
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f/go.mod h1:R/HEjbvWI0qdfb8viZUeVZm0X6IZnxAydC7YU42CMw4=
k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0=
k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ=
modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ=
modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y=
modernc.org/ccgo/v4 v4.19.2/go.mod h1:ysS3mxiMV38XGRTTcgo0DQTeTmAO4oCmJl1nX9VFI3s=
modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw=
modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU=
modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U=
modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w=
modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4=
modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo=
modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E=
modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU=
modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc=
modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss=
modernc.org/sqlite v1.34.1 h1:u3Yi6M0N8t9yKRDwhXcyp1eS5/ErhPTBggxWFuR6Hfk=
modernc.org/sqlite v1.34.1/go.mod h1:pXV2xHxhzXZsgT/RtTFAPY6JJDEvOTcTdwADQCCWD4k=
modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA=
modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
Expand All @@ -1786,3 +1814,5 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
zombiezen.com/go/sqlite v1.4.0 h1:N1s3RIljwtp4541Y8rM880qgGIgq3fTD2yks1xftnKU=
zombiezen.com/go/sqlite v1.4.0/go.mod h1:0w9F1DN9IZj9AcLS9YDKMboubCACkwYCGkzoy3eG5ik=
48 changes: 28 additions & 20 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"unsafe"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -71,25 +73,27 @@ var (
)

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
ApplyRetentionInterval time.Duration `yaml:"apply_retention_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"`
RetentionBackoffConfig backoff.Config `yaml:"retention_backoff_config"`
DeleteRequestStore string `yaml:"delete_request_store"`
DeleteRequestStoreKeyPrefix string `yaml:"delete_request_store_key_prefix"`
DeleteBatchSize int `yaml:"delete_batch_size"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
DeleteMaxInterval time.Duration `yaml:"delete_max_interval"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
UploadParallelism int `yaml:"upload_parallelism"`
CompactorRing lokiring.RingConfig `yaml:"compactor_ring,omitempty" doc:"description=The hash ring configuration used by compactors to elect a single instance for running compactions. The CLI flags prefix for this block config is: compactor.ring"`
RunOnce bool `yaml:"_" doc:"hidden"`
TablesToCompact int `yaml:"tables_to_compact"`
SkipLatestNTables int `yaml:"skip_latest_n_tables"`
WorkingDirectory string `yaml:"working_directory"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
ApplyRetentionInterval time.Duration `yaml:"apply_retention_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"`
RetentionBackoffConfig backoff.Config `yaml:"retention_backoff_config"`
DeleteRequestStore string `yaml:"delete_request_store"`
DeleteRequestStoreKeyPrefix string `yaml:"delete_request_store_key_prefix"`
DeleteRequestStoreDBType string `yaml:"delete_request_store_db_type"`
BackupDeleteRequestStoreDBType string `yaml:"backup_delete_request_store_db_type"`
DeleteBatchSize int `yaml:"delete_batch_size"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
DeleteMaxInterval time.Duration `yaml:"delete_max_interval"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
UploadParallelism int `yaml:"upload_parallelism"`
CompactorRing lokiring.RingConfig `yaml:"compactor_ring,omitempty" doc:"description=The hash ring configuration used by compactors to elect a single instance for running compactions. The CLI flags prefix for this block config is: compactor.ring"`
RunOnce bool `yaml:"_" doc:"hidden"`
TablesToCompact int `yaml:"tables_to_compact"`
SkipLatestNTables int `yaml:"skip_latest_n_tables"`
}

// RegisterFlags registers flags.
Expand All @@ -102,6 +106,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.RetentionDeleteWorkCount, "compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.StringVar(&cfg.DeleteRequestStore, "compactor.delete-request-store", "", "Store used for managing delete requests.")
f.StringVar(&cfg.DeleteRequestStoreKeyPrefix, "compactor.delete-request-store.key-prefix", "index/", "Path prefix for storing delete requests.")
f.StringVar(&cfg.DeleteRequestStoreDBType, "compactor.delete-request-store.db-type", string(deletion.DeleteRequestsStoreDBTypeBoltDB), fmt.Sprintf("Type of DB to use for storing delete requests. Supported types: %s", strings.Join(*(*[]string)(unsafe.Pointer(&deletion.SupportedDeleteRequestsStoreDBTypes)), ", ")))
f.StringVar(&cfg.BackupDeleteRequestStoreDBType, "compactor.delete-request-store.backup-db-type", "", fmt.Sprintf("Type of DB to use as backup for storing delete requests. Backup DB should ideally be used while migrating from one DB type to another. Supported type(s): %s", deletion.DeleteRequestsStoreDBTypeBoltDB))
f.IntVar(&cfg.DeleteBatchSize, "compactor.delete-batch-size", 70, "The max number of delete requests to run per compaction cycle.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.DurationVar(&cfg.DeleteMaxInterval, "compactor.delete-max-interval", 24*time.Hour, "Constrain the size of any single delete request with line filters. When a delete request > delete_max_interval is input, the request is sharded into smaller requests of no more than delete_max_interval")
Expand Down Expand Up @@ -361,10 +367,12 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie

func (c *Compactor) initDeletes(objectClient client.ObjectClient, r prometheus.Registerer, limits Limits) error {
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")
store, err := deletion.NewDeleteStore(deletionWorkDir, storage.NewIndexStorageClient(objectClient, c.cfg.DeleteRequestStoreKeyPrefix))
indexStorageClient := storage.NewIndexStorageClient(objectClient, c.cfg.DeleteRequestStoreKeyPrefix)
store, err := deletion.NewDeleteRequestsStore(deletion.DeleteRequestsStoreDBType(c.cfg.DeleteRequestStoreDBType), deletionWorkDir, indexStorageClient, deletion.DeleteRequestsStoreDBType(c.cfg.BackupDeleteRequestStoreDBType))
if err != nil {
return err
}

c.deleteRequestsStore = store

c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@ type deleteRequestsTable struct {
wg sync.WaitGroup
}

const (
deleteRequestsIndexFileName = DeleteRequestsTableName + ".gz"
deleteRequestsSQLiteFileName = DeleteRequestsTableName + ".sqlite.gz"
)
const deleteRequestsDBBoltDBFileName = DeleteRequestsTableName + ".gz"

func newDeleteRequestsTable(workingDirectory string, indexStorageClient storage.Client) (index.Client, error) {
dbPath := filepath.Join(workingDirectory, DeleteRequestsTableName, DeleteRequestsTableName)
dbPath := filepath.Join(workingDirectory, DeleteRequestsTableName)
boltdbIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: filepath.Dir(dbPath)})
if err != nil {
return nil, err
Expand Down Expand Up @@ -72,8 +69,8 @@ func (t *deleteRequestsTable) init() error {
_, err := os.Stat(t.dbPath)
if err != nil {
err = storage.DownloadFileFromStorage(t.dbPath, true,
true, storage.LoggerWithFilename(util_log.Logger, deleteRequestsIndexFileName), func() (io.ReadCloser, error) {
return t.indexStorageClient.GetFile(context.Background(), DeleteRequestsTableName, deleteRequestsIndexFileName)
true, storage.LoggerWithFilename(util_log.Logger, deleteRequestsDBBoltDBFileName), func() (io.ReadCloser, error) {
return t.indexStorageClient.GetFile(context.Background(), DeleteRequestsTableName, deleteRequestsDBBoltDBFileName)
})
if err != nil && !t.indexStorageClient.IsFileNotFoundErr(err) {
return err
Expand Down Expand Up @@ -153,7 +150,7 @@ func (t *deleteRequestsTable) uploadFile() error {
return err
}

if err := t.indexStorageClient.PutFile(context.Background(), DeleteRequestsTableName, deleteRequestsIndexFileName, f); err != nil {
if err := t.indexStorageClient.PutFile(context.Background(), DeleteRequestsTableName, deleteRequestsDBBoltDBFileName, f); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 5b33e65

Please sign in to comment.