Skip to content

Commit

Permalink
Add CleanupOnStart for compaction temporary files
Browse files Browse the repository at this point in the history
  • Loading branch information
pandres-varian committed May 6, 2024
1 parent 34e64e0 commit fe0d559
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 0 deletions.
4 changes: 4 additions & 0 deletions extension/storage/filestorage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ The default timeout is `1s`.
`compaction.max_transaction_size` (default: 65536): defines maximum size of the compaction transaction.
A value of zero will ignore transaction sizes.

`compaction.cleanup_on_start` (default: false) - specifies if removal of compaction temporary files is performed on start.
It will remove all the files in the compaction directory starting with tempdb,
temp files will be left if a previous run of the process is killed while compacting.

### Rebound (online) compaction

For rebound compaction, there are two additional parameters available:
Expand Down
33 changes: 33 additions & 0 deletions extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -63,6 +64,9 @@ func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compa
}

client := &fileStorageClient{logger: logger, db: db, compactionCfg: compactionCfg, openTimeout: timeout}
if compactionCfg.CleanupOnStart {
client.cleanup(compactionCfg.Directory)
}
if compactionCfg.OnRebound {
client.startCompactionLoop(context.Background())
}
Expand Down Expand Up @@ -342,3 +346,32 @@ func moveFileWithFallback(src string, dest string) error {
err = os.Remove(src)
return err
}

// cleanup left compaction temporary files from previous killed process
func (c *fileStorageClient) cleanup(compactionDirectory string) error {
pattern := filepath.Join(compactionDirectory, "tempdb*")
contents, err := filepath.Glob(pattern)
if err != nil {
return err
}
delCont := 0
lockedCont := 0
for _, item := range contents {
err = os.Remove(item)
if err == nil {
delCont++
c.logger.Debug("cleanup",
zap.String("deletedFile", item))
} else {
lockedCont++
c.logger.Debug("cleanup",
zap.String("lockedFile", item),
zap.Error(err))
}

}
c.logger.Info("cleanup summary",
zap.Int("deletedFiles", delCont),
zap.Int("lockedFiles", lockedCont))
return nil
}
28 changes: 28 additions & 0 deletions extension/storage/filestorage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,34 @@ func TestClientConcurrentCompaction(t *testing.T) {
}
}

func TestClientCleanupOnStart(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")
temp, _ := os.CreateTemp(tempDir, "tempdb")
// simulate ongoing compaction in another instance
tempLocked, _ := os.CreateTemp(tempDir, "tempdb")
temp.Close()

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{
Directory: tempDir,
CleanupOnStart: true,
}, false)
require.NoError(t, err)

t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
tempLocked.Close()
})

// check if cleanup removed the unlocked file and left db and locked file
files, err := os.ReadDir(tempDir)
require.NoError(t, err)
require.Equal(t, 2, len(files))
require.Equal(t, "my_db", files[0].Name())
_, f := filepath.Split(tempLocked.Name())
require.Equal(t, f, files[1].Name())
}

func BenchmarkClientGet(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")
Expand Down
4 changes: 4 additions & 0 deletions extension/storage/filestorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type CompactionConfig struct {
MaxTransactionSize int64 `mapstructure:"max_transaction_size,omitempty"`
// CheckInterval specifies frequency of compaction check
CheckInterval time.Duration `mapstructure:"check_interval,omitempty"`
// CleanupOnStart specifies removal of temporary files is performed on start.
// It will remove all the files in the compaction directory starting with tempdb,
// temp files will be left if a previous run of the process is killed while compacting.
CleanupOnStart bool `mapstructure:"cleanup_on_start,omitempty"`
}

func (cfg *Config) Validate() error {
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestLoadConfig(t *testing.T) {
ReboundTriggerThresholdMiB: 16,
ReboundNeededThresholdMiB: 128,
CheckInterval: time.Second * 5,
CleanupOnStart: true,
},
Timeout: 2 * time.Second,
FSync: true,
Expand Down
35 changes: 35 additions & 0 deletions extension/storage/filestorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,38 @@ func TestCompactionRemoveTemp(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(files))
}

func TestCleanupOnStart(t *testing.T) {
ctx := context.Background()

tempDir := t.TempDir()
// simulate left temporary compaction file from killed process
temp, _ := os.CreateTemp(tempDir, "tempdb")
temp.Close()

f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
cfg.Directory = tempDir
cfg.Compaction.Directory = tempDir
cfg.Compaction.CleanupOnStart = true
extension, err := f.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg)
require.NoError(t, err)

se, ok := extension.(storage.Extension)
require.True(t, ok)

client, err := se.GetClient(
ctx,
component.KindReceiver,
newTestEntity("my_component"),
"",
)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(ctx))
})

files, err := os.ReadDir(tempDir)
require.NoError(t, err)
require.Equal(t, 1, len(files))
}
1 change: 1 addition & 0 deletions extension/storage/filestorage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func createDefaultConfig() component.Config {
ReboundNeededThresholdMiB: defaultReboundNeededThresholdMib,
ReboundTriggerThresholdMiB: defaultReboundTriggerThresholdMib,
CheckInterval: defaultCompactionInterval,
CleanupOnStart: false,
},
Timeout: time.Second,
FSync: false,
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ file_storage/all_settings:
rebound_trigger_threshold_mib: 16
rebound_needed_threshold_mib: 128
max_transaction_size: 2048
cleanup_on_start: true
timeout: 2s
fsync: true

0 comments on commit fe0d559

Please sign in to comment.