Skip to content

Commit

Permalink
Add draft of throttled copy
Browse files Browse the repository at this point in the history
  • Loading branch information
bstrausser committed Mar 3, 2024
1 parent 6391e73 commit f1ac460
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 2 deletions.
4 changes: 3 additions & 1 deletion cmd/desync/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type cacheOptions struct {
cache string
ignoreIndexes []string
ignoreChunks []string
throttleRateMillis int
}

func newCacheCommand(ctx context.Context) *cobra.Command {
Expand Down Expand Up @@ -43,6 +44,7 @@ file with --ignore-chunks <file>.`,
flags.StringVarP(&opt.cache, "cache", "c", "", "target store")
flags.StringSliceVarP(&opt.ignoreIndexes, "ignore", "", nil, "index(s) to ignore chunks from")
flags.StringSliceVarP(&opt.ignoreChunks, "ignore-chunks", "", nil, "ignore chunks from text file")
flags.IntVarP(&opt.throttleRateMillis, "throttle-rate-millis","",0,"throttle copy in millis")
addStoreOptions(&opt.cmdStoreOptions, flags)
return cmd
}
Expand Down Expand Up @@ -116,5 +118,5 @@ func runCache(ctx context.Context, opt cacheOptions, args []string) error {
pb := desync.NewProgressBar("")

// Pull all the chunks, and load them into the cache in the process
return desync.Copy(ctx, ids, s, dst, opt.n, pb)
return desync.Copy(ctx, ids, s, dst, opt.n, pb, opt.throttleRateMillis > 0, opt.throttleRateMillis)
}
41 changes: 40 additions & 1 deletion copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,49 @@ package desync

import (
"context"
"time"

"golang.org/x/sync/errgroup"
)

type TimeThrottle struct {
lastExecutionTime time.Time
minimumTimeBetweenEachExecution time.Duration
}

func (timeThrottle *TimeThrottle) reset() {
timeThrottle.lastExecutionTime = time.Now()
}

func (timeThrottle *TimeThrottle) calculateThrottle() (bool, time.Duration) {
r := -(time.Since(timeThrottle.lastExecutionTime) - timeThrottle.minimumTimeBetweenEachExecution)
return r > 0, r
}

func (timeThrottle *TimeThrottle) waitIfRequired() {

wait, duration := timeThrottle.calculateThrottle()
if wait {
time.Sleep(duration)
}
}

func buildThrottle(waitPeriodMillis int) TimeThrottle {

d := time.Millisecond * time.Duration(waitPeriodMillis)
return TimeThrottle{time.Now().Add(-d), time.Duration(d)}
}

// Copy reads a list of chunks from the provided src store, and copies the ones
// not already present in the dst store. The goal is to load chunks from remote
// store to populate a cache. If progress is provided, it'll be called when a
// chunk has been processed. Used to draw a progress bar, can be nil.
func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar) error {
func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar, shouldThrottle bool, waitPeriodMillis int) error {

in := make(chan ChunkID)
g, ctx := errgroup.WithContext(ctx)


// Setup and start the progressbar if any
pb.SetTotal(len(ids))
pb.Start()
Expand All @@ -22,6 +53,9 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int,
// Start the workers
for i := 0; i < n; i++ {
g.Go(func() error {
waitPeriodMillis := 200
throttle := buildThrottle(waitPeriodMillis)

for id := range in {
pb.Increment()
hasChunk, err := dst.HasChunk(id)
Expand All @@ -31,7 +65,12 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int,
if hasChunk {
continue
}
if shouldThrottle {
throttle.waitIfRequired()
}

chunk, err := src.GetChunk(id)
throttle.reset()
if err != nil {
return err
}
Expand Down
72 changes: 72 additions & 0 deletions copy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package desync

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestCopy(t *testing.T) {
src_store_dir := t.TempDir()
dst_store_dir := t.TempDir()

src_store, err := NewLocalStore(src_store_dir, StoreOptions{})
require.NoError(t, err)

dst_store, err := NewLocalStore(dst_store_dir, StoreOptions{})
require.NoError(t, err)

first_chunk_data := []byte("some data")
first_chunk := NewChunk(first_chunk_data)
first_chunk_id := first_chunk.ID()

src_store.StoreChunk(first_chunk)
has_the_stored_chunk, _ := src_store.HasChunk(first_chunk_id)
require.True(t, has_the_stored_chunk)

chunks := make([]ChunkID, 1)
chunks[0] = first_chunk_id

Copy(context.Background(), chunks, src_store, dst_store, 1, NewProgressBar(""),false,100)
require.NoError(t, err)
has_the_chunk, _ := dst_store.HasChunk(first_chunk_id)

require.True(t, has_the_chunk)
}

func TestTimeThrottle(t *testing.T) {

// If the wait time is zero, we never wait

wait := time.Duration(time.Millisecond * 0)
throttle := TimeThrottle{time.Now(), wait}
w, _ := throttle.calculateThrottle()
require.False(t, w)

past := time.Now().Add(-time.Hour * 1)
throttle = TimeThrottle{past, wait}
w, _ = throttle.calculateThrottle()
require.False(t, w)

wait = time.Duration(time.Hour * 1)
throttle = TimeThrottle{time.Now(), wait}
w, d := throttle.calculateThrottle()
require.True(t, w)
require.True(t, d > time.Duration(time.Minute*59))

// Assuming out last exection was in the past, we don't wait
past = time.Now().Add(-time.Hour * 1)
wait = time.Duration(time.Second * 60)
throttle = TimeThrottle{past, wait}
w, _ = throttle.calculateThrottle()
require.False(t, w)

wait = time.Duration(time.Second * 60)
throttle = TimeThrottle{time.Now(), wait}
present := throttle.lastExecutionTime
throttle.reset()
future := throttle.lastExecutionTime
require.True(t, present.Before(future))
}

0 comments on commit f1ac460

Please sign in to comment.