From f1ac460b9e6b405be8d6e75d6a4b3ab01213b401 Mon Sep 17 00:00:00 2001 From: bstrausser Date: Sun, 3 Mar 2024 10:47:18 -0500 Subject: [PATCH 01/11] Add draft of throttled copy --- cmd/desync/cache.go | 4 ++- copy.go | 41 +++++++++++++++++++++++++- copy_test.go | 72 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 copy_test.go diff --git a/cmd/desync/cache.go b/cmd/desync/cache.go index 8c2f8f9..53b6b4c 100644 --- a/cmd/desync/cache.go +++ b/cmd/desync/cache.go @@ -14,6 +14,7 @@ type cacheOptions struct { cache string ignoreIndexes []string ignoreChunks []string + throttleRateMillis int } func newCacheCommand(ctx context.Context) *cobra.Command { @@ -43,6 +44,7 @@ file with --ignore-chunks .`, flags.StringVarP(&opt.cache, "cache", "c", "", "target store") flags.StringSliceVarP(&opt.ignoreIndexes, "ignore", "", nil, "index(s) to ignore chunks from") flags.StringSliceVarP(&opt.ignoreChunks, "ignore-chunks", "", nil, "ignore chunks from text file") + flags.IntVarP(&opt.throttleRateMillis, "throttle-rate-millis","",0,"throttle copy in millis") addStoreOptions(&opt.cmdStoreOptions, flags) return cmd } @@ -116,5 +118,5 @@ func runCache(ctx context.Context, opt cacheOptions, args []string) error { pb := desync.NewProgressBar("") // Pull all the chunks, and load them into the cache in the process - return desync.Copy(ctx, ids, s, dst, opt.n, pb) + return desync.Copy(ctx, ids, s, dst, opt.n, pb, opt.throttleRateMillis > 0, opt.throttleRateMillis) } diff --git a/copy.go b/copy.go index 940d4c8..df0401d 100644 --- a/copy.go +++ b/copy.go @@ -2,18 +2,49 @@ package desync import ( "context" + "time" "golang.org/x/sync/errgroup" ) +type TimeThrottle struct { + lastExecutionTime time.Time + minimumTimeBetweenEachExecution time.Duration +} + +func (timeThrottle *TimeThrottle) reset() { + timeThrottle.lastExecutionTime = time.Now() +} + +func (timeThrottle *TimeThrottle) calculateThrottle() (bool, time.Duration) { + r := -(time.Since(timeThrottle.lastExecutionTime) - timeThrottle.minimumTimeBetweenEachExecution) + return r > 0, r +} + +func (timeThrottle *TimeThrottle) waitIfRequired() { + + wait, duration := timeThrottle.calculateThrottle() + if wait { + time.Sleep(duration) + } +} + +func buildThrottle(waitPeriodMillis int) TimeThrottle { + + d := time.Millisecond * time.Duration(waitPeriodMillis) + return TimeThrottle{time.Now().Add(-d), time.Duration(d)} +} + // Copy reads a list of chunks from the provided src store, and copies the ones // not already present in the dst store. The goal is to load chunks from remote // store to populate a cache. If progress is provided, it'll be called when a // chunk has been processed. Used to draw a progress bar, can be nil. -func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar) error { +func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar, shouldThrottle bool, waitPeriodMillis int) error { + in := make(chan ChunkID) g, ctx := errgroup.WithContext(ctx) + // Setup and start the progressbar if any pb.SetTotal(len(ids)) pb.Start() @@ -22,6 +53,9 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, // Start the workers for i := 0; i < n; i++ { g.Go(func() error { + waitPeriodMillis := 200 + throttle := buildThrottle(waitPeriodMillis) + for id := range in { pb.Increment() hasChunk, err := dst.HasChunk(id) @@ -31,7 +65,12 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, if hasChunk { continue } + if shouldThrottle { + throttle.waitIfRequired() + } + chunk, err := src.GetChunk(id) + throttle.reset() if err != nil { return err } diff --git a/copy_test.go b/copy_test.go new file mode 100644 index 0000000..12f7751 --- /dev/null +++ b/copy_test.go @@ -0,0 +1,72 @@ +package desync + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCopy(t *testing.T) { + src_store_dir := t.TempDir() + dst_store_dir := t.TempDir() + + src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) + require.NoError(t, err) + + dst_store, err := NewLocalStore(dst_store_dir, StoreOptions{}) + require.NoError(t, err) + + first_chunk_data := []byte("some data") + first_chunk := NewChunk(first_chunk_data) + first_chunk_id := first_chunk.ID() + + src_store.StoreChunk(first_chunk) + has_the_stored_chunk, _ := src_store.HasChunk(first_chunk_id) + require.True(t, has_the_stored_chunk) + + chunks := make([]ChunkID, 1) + chunks[0] = first_chunk_id + + Copy(context.Background(), chunks, src_store, dst_store, 1, NewProgressBar(""),false,100) + require.NoError(t, err) + has_the_chunk, _ := dst_store.HasChunk(first_chunk_id) + + require.True(t, has_the_chunk) +} + +func TestTimeThrottle(t *testing.T) { + + // If the wait time is zero, we never wait + + wait := time.Duration(time.Millisecond * 0) + throttle := TimeThrottle{time.Now(), wait} + w, _ := throttle.calculateThrottle() + require.False(t, w) + + past := time.Now().Add(-time.Hour * 1) + throttle = TimeThrottle{past, wait} + w, _ = throttle.calculateThrottle() + require.False(t, w) + + wait = time.Duration(time.Hour * 1) + throttle = TimeThrottle{time.Now(), wait} + w, d := throttle.calculateThrottle() + require.True(t, w) + require.True(t, d > time.Duration(time.Minute*59)) + + // Assuming out last exection was in the past, we don't wait + past = time.Now().Add(-time.Hour * 1) + wait = time.Duration(time.Second * 60) + throttle = TimeThrottle{past, wait} + w, _ = throttle.calculateThrottle() + require.False(t, w) + + wait = time.Duration(time.Second * 60) + throttle = TimeThrottle{time.Now(), wait} + present := throttle.lastExecutionTime + throttle.reset() + future := throttle.lastExecutionTime + require.True(t, present.Before(future)) +} From 80627b29dd31da2f8540a472bfe46280009c4f0a Mon Sep 17 00:00:00 2001 From: bstrausser Date: Sun, 3 Mar 2024 12:33:29 -0500 Subject: [PATCH 02/11] Try fix failing test --- copy_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/copy_test.go b/copy_test.go index 12f7751..6a1a2f1 100644 --- a/copy_test.go +++ b/copy_test.go @@ -66,6 +66,9 @@ func TestTimeThrottle(t *testing.T) { wait = time.Duration(time.Second * 60) throttle = TimeThrottle{time.Now(), wait} present := throttle.lastExecutionTime + // Without the sleep this can fail. At least on windows + // https://github.com/folbricht/desync/actions/runs/8131384060/job/22220648517?pr=258 + time.Sleep(time.Duration(time.Millisecond*100)) throttle.reset() future := throttle.lastExecutionTime require.True(t, present.Before(future)) From 4e9f68ec484407b86721a3957805d0a2326abff5 Mon Sep 17 00:00:00 2001 From: bstrausser Date: Tue, 5 Mar 2024 14:16:36 -0500 Subject: [PATCH 03/11] Create limiting store --- copy.go | 41 ++++----------------------- copy_test.go | 41 +-------------------------- go.mod | 1 + go.sum | 2 ++ ratelimitstore.go | 63 ++++++++++++++++++++++++++++++++++++++++++ ratelimitstore_test.go | 58 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 130 insertions(+), 76 deletions(-) create mode 100644 ratelimitstore.go create mode 100644 ratelimitstore_test.go diff --git a/copy.go b/copy.go index df0401d..79c26d4 100644 --- a/copy.go +++ b/copy.go @@ -2,44 +2,15 @@ package desync import ( "context" - "time" "golang.org/x/sync/errgroup" ) -type TimeThrottle struct { - lastExecutionTime time.Time - minimumTimeBetweenEachExecution time.Duration -} - -func (timeThrottle *TimeThrottle) reset() { - timeThrottle.lastExecutionTime = time.Now() -} - -func (timeThrottle *TimeThrottle) calculateThrottle() (bool, time.Duration) { - r := -(time.Since(timeThrottle.lastExecutionTime) - timeThrottle.minimumTimeBetweenEachExecution) - return r > 0, r -} - -func (timeThrottle *TimeThrottle) waitIfRequired() { - - wait, duration := timeThrottle.calculateThrottle() - if wait { - time.Sleep(duration) - } -} - -func buildThrottle(waitPeriodMillis int) TimeThrottle { - - d := time.Millisecond * time.Duration(waitPeriodMillis) - return TimeThrottle{time.Now().Add(-d), time.Duration(d)} -} - // Copy reads a list of chunks from the provided src store, and copies the ones // not already present in the dst store. The goal is to load chunks from remote // store to populate a cache. If progress is provided, it'll be called when a // chunk has been processed. Used to draw a progress bar, can be nil. -func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar, shouldThrottle bool, waitPeriodMillis int) error { +func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar) error { in := make(chan ChunkID) g, ctx := errgroup.WithContext(ctx) @@ -53,8 +24,8 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, // Start the workers for i := 0; i < n; i++ { g.Go(func() error { - waitPeriodMillis := 200 - throttle := buildThrottle(waitPeriodMillis) + + for id := range in { pb.Increment() @@ -65,12 +36,10 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, if hasChunk { continue } - if shouldThrottle { - throttle.waitIfRequired() - } + chunk, err := src.GetChunk(id) - throttle.reset() + if err != nil { return err } diff --git a/copy_test.go b/copy_test.go index 6a1a2f1..a7bdf3f 100644 --- a/copy_test.go +++ b/copy_test.go @@ -3,7 +3,6 @@ package desync import ( "context" "testing" - "time" "github.com/stretchr/testify/require" ) @@ -29,47 +28,9 @@ func TestCopy(t *testing.T) { chunks := make([]ChunkID, 1) chunks[0] = first_chunk_id - Copy(context.Background(), chunks, src_store, dst_store, 1, NewProgressBar(""),false,100) + Copy(context.Background(), chunks, src_store, dst_store, 1, NewProgressBar("")) require.NoError(t, err) has_the_chunk, _ := dst_store.HasChunk(first_chunk_id) require.True(t, has_the_chunk) } - -func TestTimeThrottle(t *testing.T) { - - // If the wait time is zero, we never wait - - wait := time.Duration(time.Millisecond * 0) - throttle := TimeThrottle{time.Now(), wait} - w, _ := throttle.calculateThrottle() - require.False(t, w) - - past := time.Now().Add(-time.Hour * 1) - throttle = TimeThrottle{past, wait} - w, _ = throttle.calculateThrottle() - require.False(t, w) - - wait = time.Duration(time.Hour * 1) - throttle = TimeThrottle{time.Now(), wait} - w, d := throttle.calculateThrottle() - require.True(t, w) - require.True(t, d > time.Duration(time.Minute*59)) - - // Assuming out last exection was in the past, we don't wait - past = time.Now().Add(-time.Hour * 1) - wait = time.Duration(time.Second * 60) - throttle = TimeThrottle{past, wait} - w, _ = throttle.calculateThrottle() - require.False(t, w) - - wait = time.Duration(time.Second * 60) - throttle = TimeThrottle{time.Now(), wait} - present := throttle.lastExecutionTime - // Without the sleep this can fail. At least on windows - // https://github.com/folbricht/desync/actions/runs/8131384060/job/22220648517?pr=258 - time.Sleep(time.Duration(time.Millisecond*100)) - throttle.reset() - future := throttle.lastExecutionTime - require.True(t, present.Before(future)) -} diff --git a/go.mod b/go.mod index 8e34698..8f6dac8 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( golang.org/x/oauth2 v0.7.0 // indirect golang.org/x/term v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect diff --git a/go.sum b/go.sum index 3dabb6c..55e3461 100644 --- a/go.sum +++ b/go.sum @@ -202,6 +202,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/ratelimitstore.go b/ratelimitstore.go new file mode 100644 index 0000000..bdd5900 --- /dev/null +++ b/ratelimitstore.go @@ -0,0 +1,63 @@ +package desync + +import ( + "context" + "fmt" + "time" + + "golang.org/x/time/rate" +) + +type ThrottleOptions struct { + eventRate float64 + burstRate int + timeout time.Duration +} + +type RateLimitedLocalStore struct { + + wrappedStore WriteStore + + limiter *rate.Limiter + + options ThrottleOptions + +} + +func NewRateLimitedLocalStore(s WriteStore, options ThrottleOptions) *RateLimitedLocalStore { + + limiter := rate.NewLimiter(rate.Limit(options.eventRate), options.burstRate) + return &RateLimitedLocalStore{wrappedStore: s,limiter: limiter } +} + +func (s RateLimitedLocalStore) GetChunk(id ChunkID) (*Chunk, error) { + + return s.wrappedStore.GetChunk(id) +} + +func (s RateLimitedLocalStore) HasChunk(id ChunkID) (bool, error) { + + + return s.wrappedStore.HasChunk(id) +} + + +func (s RateLimitedLocalStore) StoreChunk(chunk *Chunk) error { + ctx, cancel := context.WithTimeout(context.Background(), s.options.timeout) + defer cancel() + // This isn't ideal because what I'm really interested is in size over the wire. + _, err := chunk.Data() + if err != nil { + return err + } + + //size := len(b) + err = s.limiter.WaitN(ctx,1) + if err != nil { + + fmt.Println("Rate limit context error:", err) + } + + return s.wrappedStore.StoreChunk(chunk) + +} diff --git a/ratelimitstore_test.go b/ratelimitstore_test.go new file mode 100644 index 0000000..a2548ba --- /dev/null +++ b/ratelimitstore_test.go @@ -0,0 +1,58 @@ +package desync + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func TestLimiter(t *testing.T){ + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000) + defer cancel() + + limiter := rate.NewLimiter(rate.Limit(50), 50) + err := limiter.WaitN(ctx,1) + require.Nil(t, err) + + limiter = rate.NewLimiter(rate.Limit(2), 1) + _ = limiter.WaitN(ctx,1) + require.Nil(t, err) + err = limiter.WaitN(ctx,1) + require.Nil(t, err) + + limiter = rate.NewLimiter(rate.Limit(1), 1) + _ = limiter.WaitN(ctx,1) + require.Nil(t, err) + err = limiter.WaitN(ctx,1) + require.NotNil(t, err) + + + + +} + +func TestCopyWithNoLimit(t *testing.T) { + + src_store_dir := t.TempDir() + + src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) + require.NoError(t, err) + throttleOptions := ThrottleOptions{100,100,time.Millisecond*10000} + throttledStore := NewRateLimitedLocalStore(src_store, throttleOptions) + + chunk_data := []byte("some data") + chunk := NewChunk(chunk_data) + chunk_id := chunk.ID() + + err = throttledStore.StoreChunk(chunk) + require.NotNil(t,err) + hasChunk, err := throttledStore.HasChunk(chunk_id) + require.NotNil(t,err) + require.True(t,hasChunk) + + +} \ No newline at end of file From 0fc9e60fc76f1604a4e599fc4a764174028a5a0b Mon Sep 17 00:00:00 2001 From: bstrausser Date: Tue, 5 Mar 2024 16:46:33 -0500 Subject: [PATCH 04/11] Add more test + remove deadline for now --- ratelimitstore.go | 7 +++- ratelimitstore_test.go | 83 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 82 insertions(+), 8 deletions(-) diff --git a/ratelimitstore.go b/ratelimitstore.go index bdd5900..dbd0d7b 100644 --- a/ratelimitstore.go +++ b/ratelimitstore.go @@ -43,8 +43,7 @@ func (s RateLimitedLocalStore) HasChunk(id ChunkID) (bool, error) { func (s RateLimitedLocalStore) StoreChunk(chunk *Chunk) error { - ctx, cancel := context.WithTimeout(context.Background(), s.options.timeout) - defer cancel() + // This isn't ideal because what I'm really interested is in size over the wire. _, err := chunk.Data() if err != nil { @@ -52,10 +51,14 @@ func (s RateLimitedLocalStore) StoreChunk(chunk *Chunk) error { } //size := len(b) + ctx := context.Background() + //defer cancel() + err = s.limiter.WaitN(ctx,1) if err != nil { fmt.Println("Rate limit context error:", err) + return err } return s.wrappedStore.StoreChunk(chunk) diff --git a/ratelimitstore_test.go b/ratelimitstore_test.go index a2548ba..03e3322 100644 --- a/ratelimitstore_test.go +++ b/ratelimitstore_test.go @@ -2,6 +2,8 @@ package desync import ( "context" + "crypto/rand" + "fmt" "testing" "time" @@ -9,8 +11,22 @@ import ( "golang.org/x/time/rate" ) +func random_data() ([]byte,error){ + + b := make([]byte, 16) + _, err := rand.Read(b) + if err != nil { + fmt.Println("Error: ", err) + return b,err + } + + return b,nil +} + func TestLimiter(t *testing.T){ + // This is testing the framework. So not great, but I needed to get my head around what the relation + // between events and burst do ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000) defer cancel() @@ -39,20 +55,75 @@ func TestCopyWithNoLimit(t *testing.T) { src_store_dir := t.TempDir() + // assert our store is working src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) require.NoError(t, err) - throttleOptions := ThrottleOptions{100,100,time.Millisecond*10000} - throttledStore := NewRateLimitedLocalStore(src_store, throttleOptions) - + chunk_data := []byte("some data") chunk := NewChunk(chunk_data) - chunk_id := chunk.ID() + err = src_store.StoreChunk(chunk) + require.Nil(t,err) + + throttleOptions := ThrottleOptions{1,1,time.Second*60} + throttledStore := NewRateLimitedLocalStore(src_store, throttleOptions) + + + chunk_data = []byte("different data") + chunk = NewChunk(chunk_data) + chunk_id := chunk.ID() + err = throttledStore.StoreChunk(chunk) - require.NotNil(t,err) + require.Nil(t,err) hasChunk, err := throttledStore.HasChunk(chunk_id) - require.NotNil(t,err) + require.Nil(t,err) require.True(t,hasChunk) + start := time.Now() + // We start with 1 token in the bucket and replenish at 1 token per second + // This should take ~10 seconds. + // We test it takes 8s to guard against flakiness + for i := 0; i < 10; i++ { + err = throttledStore.StoreChunk(chunk) + // This test will eventually fail when I get deadlines enabled + require.Nil(t,err) + } + finish := time.Now() + + require.True(t, finish.Sub(start).Seconds() > 8) +} + +func TestForAFullBucketNoWait(t *testing.T) { + + src_store_dir := t.TempDir() + + // assert our store is working + src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) + require.NoError(t, err) + throttleOptions := ThrottleOptions{1,100,time.Second*60} + throttledStore := NewRateLimitedLocalStore(src_store, throttleOptions) + start := time.Now() + + + chunk_ids := make([]ChunkID, 10) + // The bucket is full, we shouldn't wait + for i := 0; i < 10; i++ { + + data,err := random_data() + require.NoError(t,err) + chunk := NewChunk(data) + chunk_ids[i] = chunk.ID() + err = throttledStore.StoreChunk(chunk) + require.Nil(t,err) + } + finish := time.Now() + require.True(t, finish.Sub(start).Seconds() < 2) + for i := 0; i < 10; i++ { + + has,err := throttledStore.HasChunk(chunk_ids[i]) + require.Nil(t,err) + require.True(t,has) + + } } \ No newline at end of file From 1ccde5bc3f97911409f58323c275a2e82f4a7a08 Mon Sep 17 00:00:00 2001 From: bstrausser Date: Tue, 5 Mar 2024 19:26:07 -0500 Subject: [PATCH 05/11] Cleanup some of the test code duplication --- ratelimitstore_test.go | 111 ++++++++++++++++++++++++----------------- 1 file changed, 65 insertions(+), 46 deletions(-) diff --git a/ratelimitstore_test.go b/ratelimitstore_test.go index 03e3322..0cecc48 100644 --- a/ratelimitstore_test.go +++ b/ratelimitstore_test.go @@ -11,6 +11,19 @@ import ( "golang.org/x/time/rate" ) + +func NewTestRateLimitedLocalStore(t *testing.T, eventRate float64, burstRate int, timeout time.Duration) *RateLimitedLocalStore{ + + src_store_dir := t.TempDir() + src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) + require.NoError(t, err) + + throttleOptions := ThrottleOptions{eventRate,burstRate,timeout} + return NewRateLimitedLocalStore(src_store, throttleOptions) + + +} + func random_data() ([]byte,error){ b := make([]byte, 16) @@ -23,6 +36,32 @@ func random_data() ([]byte,error){ return b,nil } + +func storeLoop(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedLocalStore){ + + for i := 0; i < max; i++ { + + data,err := random_data() + require.NoError(t,err) + chunk := NewChunk(data) + chunk_ids[i] = chunk.ID() + err = store.StoreChunk(chunk) + require.Nil(t,err) + } + + +} + +func chunkCheck(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedLocalStore) { + for i := 0; i < max; i++ { + + has,err := store.HasChunk(chunk_ids[i]) + require.Nil(t,err) + require.True(t,has) + + } +} + func TestLimiter(t *testing.T){ // This is testing the framework. So not great, but I needed to get my head around what the relation @@ -53,38 +92,18 @@ func TestLimiter(t *testing.T){ func TestCopyWithNoLimit(t *testing.T) { - src_store_dir := t.TempDir() - - // assert our store is working - src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) - require.NoError(t, err) - chunk_data := []byte("some data") + throttledStore := NewTestRateLimitedLocalStore(t,1,1,time.Second*60) + chunk_data := []byte("different data") chunk := NewChunk(chunk_data) - err = src_store.StoreChunk(chunk) - require.Nil(t,err) - - throttleOptions := ThrottleOptions{1,1,time.Second*60} - throttledStore := NewRateLimitedLocalStore(src_store, throttleOptions) - - chunk_data = []byte("different data") - chunk = NewChunk(chunk_data) - chunk_id := chunk.ID() - - err = throttledStore.StoreChunk(chunk) - require.Nil(t,err) - hasChunk, err := throttledStore.HasChunk(chunk_id) - require.Nil(t,err) - require.True(t,hasChunk) - start := time.Now() // We start with 1 token in the bucket and replenish at 1 token per second // This should take ~10 seconds. // We test it takes 8s to guard against flakiness for i := 0; i < 10; i++ { - err = throttledStore.StoreChunk(chunk) + err := throttledStore.StoreChunk(chunk) // This test will eventually fail when I get deadlines enabled require.Nil(t,err) } @@ -93,37 +112,37 @@ func TestCopyWithNoLimit(t *testing.T) { require.True(t, finish.Sub(start).Seconds() > 8) } + func TestForAFullBucketNoWait(t *testing.T) { - src_store_dir := t.TempDir() + chunk_count := 10 + // Bucket has initial size chunk_count + throttledStore := NewTestRateLimitedLocalStore(t,1,chunk_count + 1,time.Second*60) - // assert our store is working - src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) - require.NoError(t, err) - throttleOptions := ThrottleOptions{1,100,time.Second*60} - throttledStore := NewRateLimitedLocalStore(src_store, throttleOptions) + chunk_ids := make([]ChunkID, chunk_count) + start := time.Now() + // The bucket is full, we shouldn't have to wait + storeLoop(t,chunk_count,chunk_ids,*throttledStore) + finish := time.Now() + require.True(t, finish.Sub(start).Seconds() < 2) + chunkCheck(t,chunk_count,chunk_ids,*throttledStore) +} +func TestForAFastReplenishmentRateLittleWait(t *testing.T) { + + chunk_count := 10 + // Bucket only has one, but we replenish chunk_count tokens every second + throttledStore := NewTestRateLimitedLocalStore(t,float64( chunk_count + 1),1,time.Second*60) + start := time.Now() - chunk_ids := make([]ChunkID, 10) - // The bucket is full, we shouldn't wait - for i := 0; i < 10; i++ { - - data,err := random_data() - require.NoError(t,err) - chunk := NewChunk(data) - chunk_ids[i] = chunk.ID() - err = throttledStore.StoreChunk(chunk) - require.Nil(t,err) - } + chunk_ids := make([]ChunkID, chunk_count) + storeLoop(t,chunk_count,chunk_ids,*throttledStore) + finish := time.Now() require.True(t, finish.Sub(start).Seconds() < 2) - for i := 0; i < 10; i++ { + chunkCheck(t,chunk_count,chunk_ids,*throttledStore) - has,err := throttledStore.HasChunk(chunk_ids[i]) - require.Nil(t,err) - require.True(t,has) - - } + } \ No newline at end of file From 0702186328e2ab93bf65fa9a853781efb339e4a0 Mon Sep 17 00:00:00 2001 From: bstrausser Date: Tue, 5 Mar 2024 21:13:48 -0500 Subject: [PATCH 06/11] Fix cache copy call --- cmd/desync/cache.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/desync/cache.go b/cmd/desync/cache.go index 53b6b4c..95d6136 100644 --- a/cmd/desync/cache.go +++ b/cmd/desync/cache.go @@ -44,7 +44,6 @@ file with --ignore-chunks .`, flags.StringVarP(&opt.cache, "cache", "c", "", "target store") flags.StringSliceVarP(&opt.ignoreIndexes, "ignore", "", nil, "index(s) to ignore chunks from") flags.StringSliceVarP(&opt.ignoreChunks, "ignore-chunks", "", nil, "ignore chunks from text file") - flags.IntVarP(&opt.throttleRateMillis, "throttle-rate-millis","",0,"throttle copy in millis") addStoreOptions(&opt.cmdStoreOptions, flags) return cmd } @@ -118,5 +117,5 @@ func runCache(ctx context.Context, opt cacheOptions, args []string) error { pb := desync.NewProgressBar("") // Pull all the chunks, and load them into the cache in the process - return desync.Copy(ctx, ids, s, dst, opt.n, pb, opt.throttleRateMillis > 0, opt.throttleRateMillis) + return desync.Copy(ctx, ids, s, dst, opt.n, pb) } From dd156c5c68de81625b052e9fbf25298b4f53574d Mon Sep 17 00:00:00 2001 From: bstrausser Date: Thu, 7 Mar 2024 17:35:24 -0500 Subject: [PATCH 07/11] Add in timeouts --- ratelimitstore.go | 21 ++++++++++---- ratelimitstore_test.go | 66 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 73 insertions(+), 14 deletions(-) diff --git a/ratelimitstore.go b/ratelimitstore.go index dbd0d7b..787ee13 100644 --- a/ratelimitstore.go +++ b/ratelimitstore.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/pkg/errors" "golang.org/x/time/rate" ) @@ -12,6 +13,7 @@ type ThrottleOptions struct { eventRate float64 burstRate int timeout time.Duration + immediateOrFail bool } type RateLimitedLocalStore struct { @@ -24,10 +26,12 @@ type RateLimitedLocalStore struct { } +var RateLimitedExceeded = errors.New("Rate Limit Exceeded") + func NewRateLimitedLocalStore(s WriteStore, options ThrottleOptions) *RateLimitedLocalStore { limiter := rate.NewLimiter(rate.Limit(options.eventRate), options.burstRate) - return &RateLimitedLocalStore{wrappedStore: s,limiter: limiter } + return &RateLimitedLocalStore{wrappedStore: s,limiter: limiter, options: options } } func (s RateLimitedLocalStore) GetChunk(id ChunkID) (*Chunk, error) { @@ -51,14 +55,21 @@ func (s RateLimitedLocalStore) StoreChunk(chunk *Chunk) error { } //size := len(b) - ctx := context.Background() - //defer cancel() + ctx, cancel:= context.WithTimeout(context.Background(), s.options.timeout) + defer cancel() + + if s.options.immediateOrFail{ + if !s.limiter.AllowN(time.Now(),1){ + err = errors.New("Unable to immediately store") + } + } else{ + err = s.limiter.WaitN(ctx,1) + } - err = s.limiter.WaitN(ctx,1) if err != nil { fmt.Println("Rate limit context error:", err) - return err + return RateLimitedExceeded } return s.wrappedStore.StoreChunk(chunk) diff --git a/ratelimitstore_test.go b/ratelimitstore_test.go index 0cecc48..2566e6e 100644 --- a/ratelimitstore_test.go +++ b/ratelimitstore_test.go @@ -3,6 +3,7 @@ package desync import ( "context" "crypto/rand" + "errors" "fmt" "testing" "time" @@ -12,15 +13,16 @@ import ( ) -func NewTestRateLimitedLocalStore(t *testing.T, eventRate float64, burstRate int, timeout time.Duration) *RateLimitedLocalStore{ +func NewTestRateLimitedLocalStore(t *testing.T, eventRate float64, burstRate int, timeout time.Duration, immediateOrFail bool) *RateLimitedLocalStore{ src_store_dir := t.TempDir() src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) require.NoError(t, err) - throttleOptions := ThrottleOptions{eventRate,burstRate,timeout} - return NewRateLimitedLocalStore(src_store, throttleOptions) - + throttleOptions := ThrottleOptions{eventRate,burstRate,timeout,immediateOrFail} + store :=NewRateLimitedLocalStore(src_store, throttleOptions) + require.Equal(t,store.options.burstRate,burstRate ) + return store } @@ -93,8 +95,8 @@ func TestLimiter(t *testing.T){ func TestCopyWithNoLimit(t *testing.T) { - throttledStore := NewTestRateLimitedLocalStore(t,1,1,time.Second*60) - chunk_data := []byte("different data") + throttledStore := NewTestRateLimitedLocalStore(t,1,1,time.Second*60, false) + chunk_data := []byte("different datas") chunk := NewChunk(chunk_data) @@ -117,7 +119,7 @@ func TestForAFullBucketNoWait(t *testing.T) { chunk_count := 10 // Bucket has initial size chunk_count - throttledStore := NewTestRateLimitedLocalStore(t,1,chunk_count + 1,time.Second*60) + throttledStore := NewTestRateLimitedLocalStore(t,1,chunk_count + 1,time.Second*60, false) chunk_ids := make([]ChunkID, chunk_count) start := time.Now() @@ -131,8 +133,8 @@ func TestForAFullBucketNoWait(t *testing.T) { func TestForAFastReplenishmentRateLittleWait(t *testing.T) { chunk_count := 10 - // Bucket only has one, but we replenish chunk_count tokens every second - throttledStore := NewTestRateLimitedLocalStore(t,float64( chunk_count + 1),1,time.Second*60) + // Bucket only has one token, but we replenish chunk_count tokens every second + throttledStore := NewTestRateLimitedLocalStore(t,float64( chunk_count + 1),1,time.Second*60,false) start := time.Now() @@ -145,4 +147,50 @@ func TestForAFastReplenishmentRateLittleWait(t *testing.T) { chunkCheck(t,chunk_count,chunk_ids,*throttledStore) +} + +func TestTimeout(t *testing.T) { + + // Bucket only has one token, and we replenish very slowly. We timeout, so second invocation will fail + throttledStore := NewTestRateLimitedLocalStore(t,float64(1) /100,1,time.Millisecond*1000, false) + + + + data,err := random_data() + require.NoError(t,err) + chunk := NewChunk(data) + err = throttledStore.StoreChunk(chunk) + require.Nil(t,err) + err = throttledStore.StoreChunk(chunk) + require.NotNil(t,err) + require.True(t, errors.Is(err,RateLimitedExceeded)) +} + +func TestNoTimeout(t *testing.T) { + chunk_count := 10 + // Bucket only has one token, replenish 1 per second. Timeout is 11 seconds. + throttledStore := NewTestRateLimitedLocalStore(t,1,1,time.Second*11, false) + + + chunk_ids := make([]ChunkID, chunk_count) + storeLoop(t,chunk_count,chunk_ids,*throttledStore) +} + +func TestImmediateOrFail(t *testing.T) { + + // Bucket only has one token, and we replenish very slowly. Second invocation will fail + throttledStore := NewTestRateLimitedLocalStore(t,float64(1) /100,1,time.Second*60, true) + + + + data,err := random_data() + require.NoError(t,err) + chunk := NewChunk(data) + + err = throttledStore.StoreChunk(chunk) + require.Nil(t,err) + + err = throttledStore.StoreChunk(chunk) + require.NotNil(t,err) + } \ No newline at end of file From 56497cc52e8cc0494f25a47247b1da72bd5b1c3e Mon Sep 17 00:00:00 2001 From: bstrausser Date: Sun, 10 Mar 2024 15:41:54 -0400 Subject: [PATCH 08/11] Add throttle /w tests on Reads --- ratelimitstore.go | 34 ++++++++++++++++----- ratelimitstore_test.go | 67 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 84 insertions(+), 17 deletions(-) diff --git a/ratelimitstore.go b/ratelimitstore.go index 787ee13..f64787e 100644 --- a/ratelimitstore.go +++ b/ratelimitstore.go @@ -16,7 +16,8 @@ type ThrottleOptions struct { immediateOrFail bool } -type RateLimitedLocalStore struct { + +type RateLimitedStore struct { wrappedStore WriteStore @@ -28,25 +29,42 @@ type RateLimitedLocalStore struct { var RateLimitedExceeded = errors.New("Rate Limit Exceeded") -func NewRateLimitedLocalStore(s WriteStore, options ThrottleOptions) *RateLimitedLocalStore { + +func NewRateLimitedStore(s WriteStore, options ThrottleOptions) *RateLimitedStore { limiter := rate.NewLimiter(rate.Limit(options.eventRate), options.burstRate) - return &RateLimitedLocalStore{wrappedStore: s,limiter: limiter, options: options } + return &RateLimitedStore{wrappedStore: s,limiter: limiter, options: options } } -func (s RateLimitedLocalStore) GetChunk(id ChunkID) (*Chunk, error) { +func (s RateLimitedStore) GetChunk(id ChunkID) (*Chunk, error) { - return s.wrappedStore.GetChunk(id) + chunk,err := s.wrappedStore.GetChunk(id) + if err != nil{ + return chunk, err + } + ctx, cancel:= context.WithTimeout(context.Background(), s.options.timeout) + defer cancel() + err = s.limiter.WaitN(ctx,1) + return chunk, err } -func (s RateLimitedLocalStore) HasChunk(id ChunkID) (bool, error) { +func (s RateLimitedStore) HasChunk(id ChunkID) (bool, error) { + - return s.wrappedStore.HasChunk(id) + has,err := s.wrappedStore.HasChunk(id) + if err != nil{ + return has, err + } + ctx, cancel:= context.WithTimeout(context.Background(), s.options.timeout) + defer cancel() + err = s.limiter.WaitN(ctx,1) + return has, err + } -func (s RateLimitedLocalStore) StoreChunk(chunk *Chunk) error { +func (s RateLimitedStore) StoreChunk(chunk *Chunk) error { // This isn't ideal because what I'm really interested is in size over the wire. _, err := chunk.Data() diff --git a/ratelimitstore_test.go b/ratelimitstore_test.go index 2566e6e..effb0f7 100644 --- a/ratelimitstore_test.go +++ b/ratelimitstore_test.go @@ -13,14 +13,14 @@ import ( ) -func NewTestRateLimitedLocalStore(t *testing.T, eventRate float64, burstRate int, timeout time.Duration, immediateOrFail bool) *RateLimitedLocalStore{ +func NewTestRateLimitedLocalStore(t *testing.T, eventRate float64, burstRate int, timeout time.Duration, immediateOrFail bool) *RateLimitedStore{ src_store_dir := t.TempDir() src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) require.NoError(t, err) throttleOptions := ThrottleOptions{eventRate,burstRate,timeout,immediateOrFail} - store :=NewRateLimitedLocalStore(src_store, throttleOptions) + store :=NewRateLimitedStore(src_store, throttleOptions) require.Equal(t,store.options.burstRate,burstRate ) return store @@ -38,23 +38,28 @@ func random_data() ([]byte,error){ return b,nil } +func makeChunk(t *testing.T,) *Chunk{ + data,err := random_data() + require.NoError(t,err) + chunk := NewChunk(data) + return chunk +} -func storeLoop(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedLocalStore){ +func storeLoop(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedStore){ for i := 0; i < max; i++ { - data,err := random_data() - require.NoError(t,err) - chunk := NewChunk(data) + + chunk := makeChunk(t) chunk_ids[i] = chunk.ID() - err = store.StoreChunk(chunk) + err := store.StoreChunk(chunk) require.Nil(t,err) } } -func chunkCheck(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedLocalStore) { +func chunkCheck(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedStore) { for i := 0; i < max; i++ { has,err := store.HasChunk(chunk_ids[i]) @@ -193,4 +198,48 @@ func TestImmediateOrFail(t *testing.T) { err = throttledStore.StoreChunk(chunk) require.NotNil(t,err) -} \ No newline at end of file +} + +func TestHasNoChunk(t *testing.T) { + + throttledStore := NewTestRateLimitedLocalStore(t,2,2,time.Second*11, false) + chunk := makeChunk(t) + has, err := throttledStore.HasChunk(chunk.ID()) + require.Nil(t,err) + require.False(t, has) + +} + +func TestStoresAndHasChunk(t *testing.T) { + + throttledStore := NewTestRateLimitedLocalStore(t,2,2,time.Second*1, false) + chunk := makeChunk(t) + err := throttledStore.StoreChunk(chunk) + require.Nil(t,err) + has, err := throttledStore.HasChunk(chunk.ID()) + require.Nil(t,err) + require.True(t, has) + +} + + +func TestStoresAndHasChunkWithWaits(t *testing.T) { + + // Start with 1 token, replenish at 1 token per second. Consume 1 token per HasToken, should take ~5s + throttledStore := NewTestRateLimitedLocalStore(t,float64(1),1,time.Second*10, false) + chunk := makeChunk(t) + start := time.Now() + err := throttledStore.StoreChunk(chunk) + require.Nil(t,err) + count:= 0 + for count < 5{ + count++ + has, err := throttledStore.HasChunk(chunk.ID()) + require.Nil(t,err) + require.True(t, has) + } + finish := time.Now() + require.True(t, finish.Sub(start).Seconds() < 7) + require.True(t, finish.Sub(start).Seconds() > 3) + +} From 3560177b09f7a72b12cfac10ff5604084d00e5c7 Mon Sep 17 00:00:00 2001 From: bstrausser Date: Sun, 17 Mar 2024 17:19:19 -0400 Subject: [PATCH 09/11] Remove timeout + add http tests --- ratelimitstore.go | 33 +++++------ ratelimitstore_test.go | 131 +++++++++++++++++++++++++++-------------- 2 files changed, 102 insertions(+), 62 deletions(-) diff --git a/ratelimitstore.go b/ratelimitstore.go index f64787e..333fd54 100644 --- a/ratelimitstore.go +++ b/ratelimitstore.go @@ -3,7 +3,6 @@ package desync import ( "context" "fmt" - "time" "github.com/pkg/errors" "golang.org/x/time/rate" @@ -12,8 +11,8 @@ import ( type ThrottleOptions struct { eventRate float64 burstRate int - timeout time.Duration - immediateOrFail bool + + } @@ -42,8 +41,8 @@ func (s RateLimitedStore) GetChunk(id ChunkID) (*Chunk, error) { if err != nil{ return chunk, err } - ctx, cancel:= context.WithTimeout(context.Background(), s.options.timeout) - defer cancel() + ctx := context.Background() + err = s.limiter.WaitN(ctx,1) return chunk, err } @@ -56,8 +55,7 @@ func (s RateLimitedStore) HasChunk(id ChunkID) (bool, error) { if err != nil{ return has, err } - ctx, cancel:= context.WithTimeout(context.Background(), s.options.timeout) - defer cancel() + ctx :=context.Background() err = s.limiter.WaitN(ctx,1) return has, err @@ -72,18 +70,12 @@ func (s RateLimitedStore) StoreChunk(chunk *Chunk) error { return err } - //size := len(b) - ctx, cancel:= context.WithTimeout(context.Background(), s.options.timeout) - defer cancel() - if s.options.immediateOrFail{ - if !s.limiter.AllowN(time.Now(),1){ - err = errors.New("Unable to immediately store") - } - } else{ - err = s.limiter.WaitN(ctx,1) - } + ctx := context.Background() + + + err = s.limiter.WaitN(ctx,1) if err != nil { fmt.Println("Rate limit context error:", err) @@ -93,3 +85,10 @@ func (s RateLimitedStore) StoreChunk(chunk *Chunk) error { return s.wrappedStore.StoreChunk(chunk) } + +func (s RateLimitedStore) String() string { + return s.wrappedStore.String() +} + + +func (s RateLimitedStore) Close() error { return nil } diff --git a/ratelimitstore_test.go b/ratelimitstore_test.go index effb0f7..1398403 100644 --- a/ratelimitstore_test.go +++ b/ratelimitstore_test.go @@ -3,8 +3,10 @@ package desync import ( "context" "crypto/rand" - "errors" "fmt" + "net/http/httptest" + "net/url" + "sync" "testing" "time" @@ -13,13 +15,13 @@ import ( ) -func NewTestRateLimitedLocalStore(t *testing.T, eventRate float64, burstRate int, timeout time.Duration, immediateOrFail bool) *RateLimitedStore{ +func NewTestRateLimitedLocalStore(t *testing.T, eventRate float64, burstRate int) *RateLimitedStore{ src_store_dir := t.TempDir() src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) require.NoError(t, err) - throttleOptions := ThrottleOptions{eventRate,burstRate,timeout,immediateOrFail} + throttleOptions := ThrottleOptions{eventRate,burstRate} store :=NewRateLimitedStore(src_store, throttleOptions) require.Equal(t,store.options.burstRate,burstRate ) return store @@ -100,7 +102,7 @@ func TestLimiter(t *testing.T){ func TestCopyWithNoLimit(t *testing.T) { - throttledStore := NewTestRateLimitedLocalStore(t,1,1,time.Second*60, false) + throttledStore := NewTestRateLimitedLocalStore(t,1,1) chunk_data := []byte("different datas") chunk := NewChunk(chunk_data) @@ -124,7 +126,7 @@ func TestForAFullBucketNoWait(t *testing.T) { chunk_count := 10 // Bucket has initial size chunk_count - throttledStore := NewTestRateLimitedLocalStore(t,1,chunk_count + 1,time.Second*60, false) + throttledStore := NewTestRateLimitedLocalStore(t,1,chunk_count + 1) chunk_ids := make([]ChunkID, chunk_count) start := time.Now() @@ -139,7 +141,7 @@ func TestForAFastReplenishmentRateLittleWait(t *testing.T) { chunk_count := 10 // Bucket only has one token, but we replenish chunk_count tokens every second - throttledStore := NewTestRateLimitedLocalStore(t,float64( chunk_count + 1),1,time.Second*60,false) + throttledStore := NewTestRateLimitedLocalStore(t,float64( chunk_count + 1),1) start := time.Now() @@ -154,55 +156,21 @@ func TestForAFastReplenishmentRateLittleWait(t *testing.T) { } -func TestTimeout(t *testing.T) { - - // Bucket only has one token, and we replenish very slowly. We timeout, so second invocation will fail - throttledStore := NewTestRateLimitedLocalStore(t,float64(1) /100,1,time.Millisecond*1000, false) - - - - data,err := random_data() - require.NoError(t,err) - chunk := NewChunk(data) - err = throttledStore.StoreChunk(chunk) - require.Nil(t,err) - err = throttledStore.StoreChunk(chunk) - require.NotNil(t,err) - require.True(t, errors.Is(err,RateLimitedExceeded)) -} func TestNoTimeout(t *testing.T) { chunk_count := 10 // Bucket only has one token, replenish 1 per second. Timeout is 11 seconds. - throttledStore := NewTestRateLimitedLocalStore(t,1,1,time.Second*11, false) + throttledStore := NewTestRateLimitedLocalStore(t,1,1) chunk_ids := make([]ChunkID, chunk_count) storeLoop(t,chunk_count,chunk_ids,*throttledStore) } -func TestImmediateOrFail(t *testing.T) { - - // Bucket only has one token, and we replenish very slowly. Second invocation will fail - throttledStore := NewTestRateLimitedLocalStore(t,float64(1) /100,1,time.Second*60, true) - - - - data,err := random_data() - require.NoError(t,err) - chunk := NewChunk(data) - - err = throttledStore.StoreChunk(chunk) - require.Nil(t,err) - - err = throttledStore.StoreChunk(chunk) - require.NotNil(t,err) - -} func TestHasNoChunk(t *testing.T) { - throttledStore := NewTestRateLimitedLocalStore(t,2,2,time.Second*11, false) + throttledStore := NewTestRateLimitedLocalStore(t,2,2) chunk := makeChunk(t) has, err := throttledStore.HasChunk(chunk.ID()) require.Nil(t,err) @@ -212,7 +180,7 @@ func TestHasNoChunk(t *testing.T) { func TestStoresAndHasChunk(t *testing.T) { - throttledStore := NewTestRateLimitedLocalStore(t,2,2,time.Second*1, false) + throttledStore := NewTestRateLimitedLocalStore(t,2,2) chunk := makeChunk(t) err := throttledStore.StoreChunk(chunk) require.Nil(t,err) @@ -225,8 +193,8 @@ func TestStoresAndHasChunk(t *testing.T) { func TestStoresAndHasChunkWithWaits(t *testing.T) { - // Start with 1 token, replenish at 1 token per second. Consume 1 token per HasToken, should take ~5s - throttledStore := NewTestRateLimitedLocalStore(t,float64(1),1,time.Second*10, false) + // Start with 1 token, replenish at 1 token per second. Consume 1 token per HasChunk, should take ~5s + throttledStore := NewTestRateLimitedLocalStore(t,float64(1),1) chunk := makeChunk(t) start := time.Now() err := throttledStore.StoreChunk(chunk) @@ -243,3 +211,76 @@ func TestStoresAndHasChunkWithWaits(t *testing.T) { require.True(t, finish.Sub(start).Seconds() > 3) } + +func TestHTTPHandlerReadWriteWithThrottle(t *testing.T) { + + + tests:= map[string]struct { + eventRate float64 + burstRate int + minTime float64 + maxTime float64 + ops int + readers int + } { + + "full bucket" : {10,220,0,5,100,100}, + "bucket with 50 tokens and a 10 t/s replenishment rate" : {10,50,13,17,100,100}, + "bucket with 1 tokens and a 0.2 t/s replenishment rate" : {0.2,1,45,55,5,5}, + + } + + for name, test := range tests { + t.Run(name,func(t *testing.T) { + + + + upstream:= NewTestRateLimitedLocalStore(t,test.eventRate,test.burstRate) + rw := httptest.NewServer(NewHTTPHandler(upstream, true, false, []converter{Compressor{}}, "")) + defer rw.Close() + + chunkIdChan := make(chan ChunkID,test.ops) + defer close(chunkIdChan) + + var wg sync.WaitGroup + rwStoreURL, _ := url.Parse(rw.URL) + start := time.Now() + for i:=0; i < test.readers; i++{ + + + go func () { + defer wg.Done() + + rwStore, err := NewRemoteHTTPStore(rwStoreURL, StoreOptions{}) + require.NoError(t, err) + dataIn := []byte("some data") + chunkIn := NewChunk(dataIn) + err = rwStore.StoreChunk(chunkIn) + require.NoError(t, err) + chunkIdChan <- chunkIn.ID() + + }() + wg.Add(1) + go func () { + defer wg.Done() + rwStore, err := NewRemoteHTTPStore(rwStoreURL, StoreOptions{}) + require.NoError(t, err) + id := <- chunkIdChan + hasChunk, err := rwStore.HasChunk(id) + require.NoError(t, err) + require.True(t, hasChunk) + + }() + wg.Add(1) + } + + wg.Wait() + + finish := time.Now() + diff := finish.Sub(start).Seconds() + require.True(t, diff < test.maxTime) + require.True(t, diff > test.minTime) + }) +} + +} From 681a6f5ad04d9ef7a1cc4a1099e873f7daf5ea69 Mon Sep 17 00:00:00 2001 From: bstrausser Date: Mon, 18 Mar 2024 00:11:12 -0400 Subject: [PATCH 10/11] Add integration tests with copy --- ratelimitstore_test.go | 54 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 4 deletions(-) diff --git a/ratelimitstore_test.go b/ratelimitstore_test.go index 1398403..dc78c6f 100644 --- a/ratelimitstore_test.go +++ b/ratelimitstore_test.go @@ -61,8 +61,8 @@ func storeLoop(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedStor } -func chunkCheck(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedStore) { - for i := 0; i < max; i++ { +func chunkCheck(t *testing.T, chunk_ids []ChunkID, store RateLimitedStore) { + for i := 0; i < len(chunk_ids); i++ { has,err := store.HasChunk(chunk_ids[i]) require.Nil(t,err) @@ -134,7 +134,7 @@ func TestForAFullBucketNoWait(t *testing.T) { storeLoop(t,chunk_count,chunk_ids,*throttledStore) finish := time.Now() require.True(t, finish.Sub(start).Seconds() < 2) - chunkCheck(t,chunk_count,chunk_ids,*throttledStore) + chunkCheck(t,chunk_ids,*throttledStore) } func TestForAFastReplenishmentRateLittleWait(t *testing.T) { @@ -151,7 +151,7 @@ func TestForAFastReplenishmentRateLittleWait(t *testing.T) { finish := time.Now() require.True(t, finish.Sub(start).Seconds() < 2) - chunkCheck(t,chunk_count,chunk_ids,*throttledStore) + chunkCheck(t,chunk_ids,*throttledStore) } @@ -284,3 +284,49 @@ func TestHTTPHandlerReadWriteWithThrottle(t *testing.T) { } } + +func TestWithCopy(t *testing.T){ + + tests := map[string]struct{ + + chunkCount int + eventRate float64 + burstRate int + minTime float64 + maxTime float64 + }{ + "empty-ish buckets 10 token per second" : {10,10,1,0,5}, + "full buckets one token per second" : {100,1,1000,0,5}, + "100 token buckets 500-hundred token per second" : {1000,500,100,6,9}, + + + } + + for name, test := range tests{ + t.Run(name,func(t *testing.T) { + + + + chunkCount := test.chunkCount + chunkIds := make([]ChunkID, chunkCount) + remote := NewTestRateLimitedLocalStore(t,test.eventRate,int(test.burstRate)) + local := NewTestRateLimitedLocalStore(t,test.eventRate,int(test.burstRate)) + ctxt := context.Background() + pb := NewProgressBar("") + start := time.Now() + // This will consume chunkCount ops + storeLoop(t,chunkCount,chunkIds,*remote) + + // This will consume chunkCount*4 tokens + err := Copy(ctxt,chunkIds,remote,local,10, pb) + chunkCheck(t,chunkIds,*local) + require.Nil(t,err) + finish := time.Now() + diff := finish.Sub(start).Seconds() + require.True(t, diff < test.maxTime) + require.True(t, diff > test.minTime) + + }) + } + +} \ No newline at end of file From eb52eb8866629f8c6514c87cf41bd088ed467162 Mon Sep 17 00:00:00 2001 From: bstrausser Date: Wed, 10 Apr 2024 14:27:21 -0400 Subject: [PATCH 11/11] Remove test case --- ratelimitstore_test.go | 322 +++++++++++++++++++---------------------- 1 file changed, 148 insertions(+), 174 deletions(-) diff --git a/ratelimitstore_test.go b/ratelimitstore_test.go index dc78c6f..db6bf8e 100644 --- a/ratelimitstore_test.go +++ b/ratelimitstore_test.go @@ -14,64 +14,61 @@ import ( "golang.org/x/time/rate" ) - -func NewTestRateLimitedLocalStore(t *testing.T, eventRate float64, burstRate int) *RateLimitedStore{ +func NewTestRateLimitedLocalStore(t *testing.T, eventRate float64, burstRate int) *RateLimitedStore { src_store_dir := t.TempDir() src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) require.NoError(t, err) - - throttleOptions := ThrottleOptions{eventRate,burstRate} - store :=NewRateLimitedStore(src_store, throttleOptions) - require.Equal(t,store.options.burstRate,burstRate ) + + throttleOptions := ThrottleOptions{eventRate, burstRate} + store := NewRateLimitedStore(src_store, throttleOptions) + require.Equal(t, store.options.burstRate, burstRate) return store } -func random_data() ([]byte,error){ +func random_data() ([]byte, error) { - b := make([]byte, 16) - _, err := rand.Read(b) - if err != nil { - fmt.Println("Error: ", err) - return b,err - } + b := make([]byte, 16) + _, err := rand.Read(b) + if err != nil { + fmt.Println("Error: ", err) + return b, err + } - return b,nil + return b, nil } -func makeChunk(t *testing.T,) *Chunk{ - data,err := random_data() - require.NoError(t,err) +func makeChunk(t *testing.T) *Chunk { + data, err := random_data() + require.NoError(t, err) chunk := NewChunk(data) return chunk } -func storeLoop(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedStore){ - +func storeLoop(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedStore) { + for i := 0; i < max; i++ { - - + chunk := makeChunk(t) chunk_ids[i] = chunk.ID() - err := store.StoreChunk(chunk) - require.Nil(t,err) + err := store.StoreChunk(chunk) + require.Nil(t, err) } - } -func chunkCheck(t *testing.T, chunk_ids []ChunkID, store RateLimitedStore) { +func chunkCheck(t *testing.T, chunk_ids []ChunkID, store RateLimitedStore) { for i := 0; i < len(chunk_ids); i++ { - has,err := store.HasChunk(chunk_ids[i]) - require.Nil(t,err) - require.True(t,has) + has, err := store.HasChunk(chunk_ids[i]) + require.Nil(t, err) + require.True(t, has) } } -func TestLimiter(t *testing.T){ +func TestLimiter(t *testing.T) { // This is testing the framework. So not great, but I needed to get my head around what the relation // between events and burst do @@ -79,254 +76,231 @@ func TestLimiter(t *testing.T){ defer cancel() limiter := rate.NewLimiter(rate.Limit(50), 50) - err := limiter.WaitN(ctx,1) + err := limiter.WaitN(ctx, 1) require.Nil(t, err) limiter = rate.NewLimiter(rate.Limit(2), 1) - _ = limiter.WaitN(ctx,1) + _ = limiter.WaitN(ctx, 1) require.Nil(t, err) - err = limiter.WaitN(ctx,1) + err = limiter.WaitN(ctx, 1) require.Nil(t, err) limiter = rate.NewLimiter(rate.Limit(1), 1) - _ = limiter.WaitN(ctx,1) + _ = limiter.WaitN(ctx, 1) require.Nil(t, err) - err = limiter.WaitN(ctx,1) + err = limiter.WaitN(ctx, 1) require.NotNil(t, err) - - - } func TestCopyWithNoLimit(t *testing.T) { - - throttledStore := NewTestRateLimitedLocalStore(t,1,1) + throttledStore := NewTestRateLimitedLocalStore(t, 1, 1) chunk_data := []byte("different datas") chunk := NewChunk(chunk_data) - start := time.Now() // We start with 1 token in the bucket and replenish at 1 token per second // This should take ~10 seconds. // We test it takes 8s to guard against flakiness for i := 0; i < 10; i++ { - err := throttledStore.StoreChunk(chunk) + err := throttledStore.StoreChunk(chunk) // This test will eventually fail when I get deadlines enabled - require.Nil(t,err) + require.Nil(t, err) } finish := time.Now() require.True(t, finish.Sub(start).Seconds() > 8) } - func TestForAFullBucketNoWait(t *testing.T) { chunk_count := 10 // Bucket has initial size chunk_count - throttledStore := NewTestRateLimitedLocalStore(t,1,chunk_count + 1) + throttledStore := NewTestRateLimitedLocalStore(t, 1, chunk_count+1) chunk_ids := make([]ChunkID, chunk_count) start := time.Now() // The bucket is full, we shouldn't have to wait - storeLoop(t,chunk_count,chunk_ids,*throttledStore) + storeLoop(t, chunk_count, chunk_ids, *throttledStore) finish := time.Now() require.True(t, finish.Sub(start).Seconds() < 2) - chunkCheck(t,chunk_ids,*throttledStore) + chunkCheck(t, chunk_ids, *throttledStore) } func TestForAFastReplenishmentRateLittleWait(t *testing.T) { chunk_count := 10 // Bucket only has one token, but we replenish chunk_count tokens every second - throttledStore := NewTestRateLimitedLocalStore(t,float64( chunk_count + 1),1) - + throttledStore := NewTestRateLimitedLocalStore(t, float64(chunk_count+1), 1) + start := time.Now() - chunk_ids := make([]ChunkID, chunk_count) - storeLoop(t,chunk_count,chunk_ids,*throttledStore) - + storeLoop(t, chunk_count, chunk_ids, *throttledStore) + finish := time.Now() require.True(t, finish.Sub(start).Seconds() < 2) - chunkCheck(t,chunk_ids,*throttledStore) + chunkCheck(t, chunk_ids, *throttledStore) - } - func TestNoTimeout(t *testing.T) { chunk_count := 10 // Bucket only has one token, replenish 1 per second. Timeout is 11 seconds. - throttledStore := NewTestRateLimitedLocalStore(t,1,1) - + throttledStore := NewTestRateLimitedLocalStore(t, 1, 1) chunk_ids := make([]ChunkID, chunk_count) - storeLoop(t,chunk_count,chunk_ids,*throttledStore) + storeLoop(t, chunk_count, chunk_ids, *throttledStore) } - func TestHasNoChunk(t *testing.T) { - - throttledStore := NewTestRateLimitedLocalStore(t,2,2) + + throttledStore := NewTestRateLimitedLocalStore(t, 2, 2) chunk := makeChunk(t) has, err := throttledStore.HasChunk(chunk.ID()) - require.Nil(t,err) + require.Nil(t, err) require.False(t, has) - + } func TestStoresAndHasChunk(t *testing.T) { - - throttledStore := NewTestRateLimitedLocalStore(t,2,2) + + throttledStore := NewTestRateLimitedLocalStore(t, 2, 2) chunk := makeChunk(t) err := throttledStore.StoreChunk(chunk) - require.Nil(t,err) + require.Nil(t, err) has, err := throttledStore.HasChunk(chunk.ID()) - require.Nil(t,err) + require.Nil(t, err) require.True(t, has) - -} +} func TestStoresAndHasChunkWithWaits(t *testing.T) { - + // Start with 1 token, replenish at 1 token per second. Consume 1 token per HasChunk, should take ~5s - throttledStore := NewTestRateLimitedLocalStore(t,float64(1),1) + throttledStore := NewTestRateLimitedLocalStore(t, float64(1), 1) chunk := makeChunk(t) start := time.Now() err := throttledStore.StoreChunk(chunk) - require.Nil(t,err) - count:= 0 - for count < 5{ + require.Nil(t, err) + count := 0 + for count < 5 { count++ has, err := throttledStore.HasChunk(chunk.ID()) - require.Nil(t,err) + require.Nil(t, err) require.True(t, has) } finish := time.Now() require.True(t, finish.Sub(start).Seconds() < 7) require.True(t, finish.Sub(start).Seconds() > 3) - + } func TestHTTPHandlerReadWriteWithThrottle(t *testing.T) { - - tests:= map[string]struct { + tests := map[string]struct { eventRate float64 - burstRate int - minTime float64 - maxTime float64 - ops int - readers int - } { - - "full bucket" : {10,220,0,5,100,100}, - "bucket with 50 tokens and a 10 t/s replenishment rate" : {10,50,13,17,100,100}, - "bucket with 1 tokens and a 0.2 t/s replenishment rate" : {0.2,1,45,55,5,5}, - + burstRate int + minTime float64 + maxTime float64 + ops int + readers int + }{ + + "full bucket": {10, 220, 0, 5, 100, 100}, + "Bucket with 1 tokens and a 0.2 t/s replenishment rate": {0.2, 1, 10, 20, 2, 2}, } for name, test := range tests { - t.Run(name,func(t *testing.T) { - - - - upstream:= NewTestRateLimitedLocalStore(t,test.eventRate,test.burstRate) - rw := httptest.NewServer(NewHTTPHandler(upstream, true, false, []converter{Compressor{}}, "")) - defer rw.Close() - - chunkIdChan := make(chan ChunkID,test.ops) - defer close(chunkIdChan) - - var wg sync.WaitGroup - rwStoreURL, _ := url.Parse(rw.URL) - start := time.Now() - for i:=0; i < test.readers; i++{ - - - go func () { - defer wg.Done() - - rwStore, err := NewRemoteHTTPStore(rwStoreURL, StoreOptions{}) - require.NoError(t, err) - dataIn := []byte("some data") - chunkIn := NewChunk(dataIn) - err = rwStore.StoreChunk(chunkIn) - require.NoError(t, err) - chunkIdChan <- chunkIn.ID() - - }() - wg.Add(1) - go func () { - defer wg.Done() - rwStore, err := NewRemoteHTTPStore(rwStoreURL, StoreOptions{}) - require.NoError(t, err) - id := <- chunkIdChan - hasChunk, err := rwStore.HasChunk(id) - require.NoError(t, err) - require.True(t, hasChunk) - - }() - wg.Add(1) + t.Run(name, func(t *testing.T) { + + upstream := NewTestRateLimitedLocalStore(t, test.eventRate, test.burstRate) + rw := httptest.NewServer(NewHTTPHandler(upstream, true, false, []converter{Compressor{}}, "")) + defer rw.Close() + + chunkIdChan := make(chan ChunkID, test.ops) + defer close(chunkIdChan) + + var wg sync.WaitGroup + rwStoreURL, _ := url.Parse(rw.URL) + start := time.Now() + for i := 0; i < test.readers; i++ { + + go func() { + defer wg.Done() + + rwStore, err := NewRemoteHTTPStore(rwStoreURL, StoreOptions{}) + require.NoError(t, err) + dataIn := []byte("some data") + chunkIn := NewChunk(dataIn) + err = rwStore.StoreChunk(chunkIn) + require.NoError(t, err) + chunkIdChan <- chunkIn.ID() + + }() + wg.Add(1) + go func() { + defer wg.Done() + rwStore, err := NewRemoteHTTPStore(rwStoreURL, StoreOptions{}) + require.NoError(t, err) + id := <-chunkIdChan + hasChunk, err := rwStore.HasChunk(id) + require.NoError(t, err) + require.True(t, hasChunk) + + }() + wg.Add(1) + } + + wg.Wait() + + finish := time.Now() + diff := finish.Sub(start).Seconds() + require.True(t, diff < test.maxTime) + require.True(t, diff > test.minTime) + }) } - - wg.Wait() - finish := time.Now() - diff := finish.Sub(start).Seconds() - require.True(t, diff < test.maxTime) - require.True(t, diff > test.minTime) - }) -} - } -func TestWithCopy(t *testing.T){ +func TestWithCopy(t *testing.T) { - tests := map[string]struct{ - + tests := map[string]struct { chunkCount int - eventRate float64 - burstRate int - minTime float64 - maxTime float64 + eventRate float64 + burstRate int + minTime float64 + maxTime float64 }{ - "empty-ish buckets 10 token per second" : {10,10,1,0,5}, - "full buckets one token per second" : {100,1,1000,0,5}, - "100 token buckets 500-hundred token per second" : {1000,500,100,6,9}, - - + "empty-ish buckets 10 token per second": {10, 10, 1, 0, 5}, + "full buckets one token per second": {100, 1, 1000, 0, 5}, + "100 token buckets 500-hundred token per second": {1000, 500, 100, 6, 9}, } - for name, test := range tests{ - t.Run(name,func(t *testing.T) { - - - - chunkCount := test.chunkCount - chunkIds := make([]ChunkID, chunkCount) - remote := NewTestRateLimitedLocalStore(t,test.eventRate,int(test.burstRate)) - local := NewTestRateLimitedLocalStore(t,test.eventRate,int(test.burstRate)) - ctxt := context.Background() - pb := NewProgressBar("") - start := time.Now() - // This will consume chunkCount ops - storeLoop(t,chunkCount,chunkIds,*remote) - - // This will consume chunkCount*4 tokens - err := Copy(ctxt,chunkIds,remote,local,10, pb) - chunkCheck(t,chunkIds,*local) - require.Nil(t,err) - finish := time.Now() - diff := finish.Sub(start).Seconds() - require.True(t, diff < test.maxTime) - require.True(t, diff > test.minTime) + for name, test := range tests { + t.Run(name, func(t *testing.T) { + + chunkCount := test.chunkCount + chunkIds := make([]ChunkID, chunkCount) + remote := NewTestRateLimitedLocalStore(t, test.eventRate, int(test.burstRate)) + local := NewTestRateLimitedLocalStore(t, test.eventRate, int(test.burstRate)) + ctxt := context.Background() + pb := NewProgressBar("") + start := time.Now() + // This will consume chunkCount ops + storeLoop(t, chunkCount, chunkIds, *remote) + + // This will consume chunkCount*4 tokens + err := Copy(ctxt, chunkIds, remote, local, 10, pb) + chunkCheck(t, chunkIds, *local) + require.Nil(t, err) + finish := time.Now() + diff := finish.Sub(start).Seconds() + require.True(t, diff < test.maxTime) + require.True(t, diff > test.minTime) }) } -} \ No newline at end of file +}