diff --git a/extension/storage/filestorage/README.md b/extension/storage/filestorage/README.md index c1a6bb5a9b4d..1ea12d91fb14 100644 --- a/extension/storage/filestorage/README.md +++ b/extension/storage/filestorage/README.md @@ -34,6 +34,10 @@ The default timeout is `1s`. `compaction.max_transaction_size` (default: 65536): defines maximum size of the compaction transaction. A value of zero will ignore transaction sizes. +`compaction.cleanup_on_start` (default: false) - specifies if removal of compaction temporary files is performed on start. +It will remove all the files in the compaction directory starting with tempdb, +temp files will be left if a previous run of the process is killed while compacting. + ### Rebound (online) compaction For rebound compaction, there are two additional parameters available: diff --git a/extension/storage/filestorage/client.go b/extension/storage/filestorage/client.go index c8fca4ba0201..1496ca14d464 100644 --- a/extension/storage/filestorage/client.go +++ b/extension/storage/filestorage/client.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "os" + "path/filepath" "sync" "syscall" "time" @@ -63,6 +64,9 @@ func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compa } client := &fileStorageClient{logger: logger, db: db, compactionCfg: compactionCfg, openTimeout: timeout} + if compactionCfg.CleanupOnStart { + client.cleanup(compactionCfg.Directory) + } if compactionCfg.OnRebound { client.startCompactionLoop(context.Background()) } @@ -342,3 +346,32 @@ func moveFileWithFallback(src string, dest string) error { err = os.Remove(src) return err } + +// cleanup left compaction temporary files from previous killed process +func (c *fileStorageClient) cleanup(compactionDirectory string) error { + pattern := filepath.Join(compactionDirectory, "tempdb*") + contents, err := filepath.Glob(pattern) + if err != nil { + return err + } + delCont := 0 + lockedCont := 0 + for _, item := range contents { + err = os.Remove(item) + if err == nil { + delCont++ + c.logger.Debug("cleanup", + zap.String("deletedFile", item)) + } else { + lockedCont++ + c.logger.Debug("cleanup", + zap.String("lockedFile", item), + zap.Error(err)) + } + + } + c.logger.Info("cleanup summary", + zap.Int("deletedFiles", delCont), + zap.Int("lockedFiles", lockedCont)) + return nil +} diff --git a/extension/storage/filestorage/client_test.go b/extension/storage/filestorage/client_test.go index 8717f553b90e..16b47f8cbfd0 100644 --- a/extension/storage/filestorage/client_test.go +++ b/extension/storage/filestorage/client_test.go @@ -404,6 +404,34 @@ func TestClientConcurrentCompaction(t *testing.T) { } } +func TestClientCleanupOnStart(t *testing.T) { + tempDir := t.TempDir() + dbFile := filepath.Join(tempDir, "my_db") + temp, _ := os.CreateTemp(tempDir, "tempdb") + // simulate ongoing compaction in another instance + tempLocked, _ := os.CreateTemp(tempDir, "tempdb") + temp.Close() + + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{ + Directory: tempDir, + CleanupOnStart: true, + }, false) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, client.Close(context.TODO())) + tempLocked.Close() + }) + + // check if cleanup removed the unlocked file and left db and locked file + files, err := os.ReadDir(tempDir) + require.NoError(t, err) + require.Equal(t, 2, len(files)) + require.Equal(t, "my_db", files[0].Name()) + _, f := filepath.Split(tempLocked.Name()) + require.Equal(t, f, files[1].Name()) +} + func BenchmarkClientGet(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") diff --git a/extension/storage/filestorage/config.go b/extension/storage/filestorage/config.go index d71bbe0234fc..19e288a7655b 100644 --- a/extension/storage/filestorage/config.go +++ b/extension/storage/filestorage/config.go @@ -45,6 +45,10 @@ type CompactionConfig struct { MaxTransactionSize int64 `mapstructure:"max_transaction_size,omitempty"` // CheckInterval specifies frequency of compaction check CheckInterval time.Duration `mapstructure:"check_interval,omitempty"` + // CleanupOnStart specifies removal of temporary files is performed on start. + // It will remove all the files in the compaction directory starting with tempdb, + // temp files will be left if a previous run of the process is killed while compacting. + CleanupOnStart bool `mapstructure:"cleanup_on_start,omitempty"` } func (cfg *Config) Validate() error { diff --git a/extension/storage/filestorage/config_test.go b/extension/storage/filestorage/config_test.go index 11d898a55f17..67decc8dbed5 100644 --- a/extension/storage/filestorage/config_test.go +++ b/extension/storage/filestorage/config_test.go @@ -45,6 +45,7 @@ func TestLoadConfig(t *testing.T) { ReboundTriggerThresholdMiB: 16, ReboundNeededThresholdMiB: 128, CheckInterval: time.Second * 5, + CleanupOnStart: true, }, Timeout: 2 * time.Second, FSync: true, diff --git a/extension/storage/filestorage/extension_test.go b/extension/storage/filestorage/extension_test.go index d808647b1293..cd4bf16f77fb 100644 --- a/extension/storage/filestorage/extension_test.go +++ b/extension/storage/filestorage/extension_test.go @@ -448,3 +448,38 @@ func TestCompactionRemoveTemp(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(files)) } + +func TestCleanupOnStart(t *testing.T) { + ctx := context.Background() + + tempDir := t.TempDir() + // simulate left temporary compaction file from killed process + temp, _ := os.CreateTemp(tempDir, "tempdb") + temp.Close() + + f := NewFactory() + cfg := f.CreateDefaultConfig().(*Config) + cfg.Directory = tempDir + cfg.Compaction.Directory = tempDir + cfg.Compaction.CleanupOnStart = true + extension, err := f.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + + se, ok := extension.(storage.Extension) + require.True(t, ok) + + client, err := se.GetClient( + ctx, + component.KindReceiver, + newTestEntity("my_component"), + "", + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, client.Close(ctx)) + }) + + files, err := os.ReadDir(tempDir) + require.NoError(t, err) + require.Equal(t, 1, len(files)) +} diff --git a/extension/storage/filestorage/factory.go b/extension/storage/filestorage/factory.go index ef3e04e9d3c7..18178c54a4ea 100644 --- a/extension/storage/filestorage/factory.go +++ b/extension/storage/filestorage/factory.go @@ -45,6 +45,7 @@ func createDefaultConfig() component.Config { ReboundNeededThresholdMiB: defaultReboundNeededThresholdMib, ReboundTriggerThresholdMiB: defaultReboundTriggerThresholdMib, CheckInterval: defaultCompactionInterval, + CleanupOnStart: false, }, Timeout: time.Second, FSync: false, diff --git a/extension/storage/filestorage/testdata/config.yaml b/extension/storage/filestorage/testdata/config.yaml index 4a923aee71fe..bcdbaac9a291 100644 --- a/extension/storage/filestorage/testdata/config.yaml +++ b/extension/storage/filestorage/testdata/config.yaml @@ -12,5 +12,6 @@ file_storage/all_settings: rebound_trigger_threshold_mib: 16 rebound_needed_threshold_mib: 128 max_transaction_size: 2048 + cleanup_on_start: true timeout: 2s fsync: true