diff --git a/cmd/desync/cache.go b/cmd/desync/cache.go index 8c2f8f9..95d6136 100644 --- a/cmd/desync/cache.go +++ b/cmd/desync/cache.go @@ -14,6 +14,7 @@ type cacheOptions struct { cache string ignoreIndexes []string ignoreChunks []string + throttleRateMillis int } func newCacheCommand(ctx context.Context) *cobra.Command { diff --git a/copy.go b/copy.go index 940d4c8..79c26d4 100644 --- a/copy.go +++ b/copy.go @@ -11,9 +11,11 @@ import ( // store to populate a cache. If progress is provided, it'll be called when a // chunk has been processed. Used to draw a progress bar, can be nil. func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar) error { + in := make(chan ChunkID) g, ctx := errgroup.WithContext(ctx) + // Setup and start the progressbar if any pb.SetTotal(len(ids)) pb.Start() @@ -22,6 +24,9 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, // Start the workers for i := 0; i < n; i++ { g.Go(func() error { + + + for id := range in { pb.Increment() hasChunk, err := dst.HasChunk(id) @@ -31,7 +36,10 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, if hasChunk { continue } + + chunk, err := src.GetChunk(id) + if err != nil { return err } diff --git a/copy_test.go b/copy_test.go new file mode 100644 index 0000000..a7bdf3f --- /dev/null +++ b/copy_test.go @@ -0,0 +1,36 @@ +package desync + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCopy(t *testing.T) { + src_store_dir := t.TempDir() + dst_store_dir := t.TempDir() + + src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) + require.NoError(t, err) + + dst_store, err := NewLocalStore(dst_store_dir, StoreOptions{}) + require.NoError(t, err) + + first_chunk_data := []byte("some data") + first_chunk := NewChunk(first_chunk_data) + first_chunk_id := first_chunk.ID() + + src_store.StoreChunk(first_chunk) + has_the_stored_chunk, _ := src_store.HasChunk(first_chunk_id) + require.True(t, has_the_stored_chunk) + + chunks := make([]ChunkID, 1) + chunks[0] = first_chunk_id + + Copy(context.Background(), chunks, src_store, dst_store, 1, NewProgressBar("")) + require.NoError(t, err) + has_the_chunk, _ := dst_store.HasChunk(first_chunk_id) + + require.True(t, has_the_chunk) +} diff --git a/go.mod b/go.mod index 8e34698..8f6dac8 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( golang.org/x/oauth2 v0.7.0 // indirect golang.org/x/term v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect diff --git a/go.sum b/go.sum index 3dabb6c..55e3461 100644 --- a/go.sum +++ b/go.sum @@ -202,6 +202,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/ratelimitstore.go b/ratelimitstore.go new file mode 100644 index 0000000..333fd54 --- /dev/null +++ b/ratelimitstore.go @@ -0,0 +1,94 @@ +package desync + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "golang.org/x/time/rate" +) + +type ThrottleOptions struct { + eventRate float64 + burstRate int + + +} + + +type RateLimitedStore struct { + + wrappedStore WriteStore + + limiter *rate.Limiter + + options ThrottleOptions + +} + +var RateLimitedExceeded = errors.New("Rate Limit Exceeded") + + +func NewRateLimitedStore(s WriteStore, options ThrottleOptions) *RateLimitedStore { + + limiter := rate.NewLimiter(rate.Limit(options.eventRate), options.burstRate) + return &RateLimitedStore{wrappedStore: s,limiter: limiter, options: options } +} + +func (s RateLimitedStore) GetChunk(id ChunkID) (*Chunk, error) { + + chunk,err := s.wrappedStore.GetChunk(id) + if err != nil{ + return chunk, err + } + ctx := context.Background() + + err = s.limiter.WaitN(ctx,1) + return chunk, err +} + +func (s RateLimitedStore) HasChunk(id ChunkID) (bool, error) { + + + + has,err := s.wrappedStore.HasChunk(id) + if err != nil{ + return has, err + } + ctx :=context.Background() + err = s.limiter.WaitN(ctx,1) + return has, err + +} + + +func (s RateLimitedStore) StoreChunk(chunk *Chunk) error { + + // This isn't ideal because what I'm really interested is in size over the wire. + _, err := chunk.Data() + if err != nil { + return err + } + + + ctx := context.Background() + + + + err = s.limiter.WaitN(ctx,1) + if err != nil { + + fmt.Println("Rate limit context error:", err) + return RateLimitedExceeded + } + + return s.wrappedStore.StoreChunk(chunk) + +} + +func (s RateLimitedStore) String() string { + return s.wrappedStore.String() +} + + +func (s RateLimitedStore) Close() error { return nil } diff --git a/ratelimitstore_test.go b/ratelimitstore_test.go new file mode 100644 index 0000000..db6bf8e --- /dev/null +++ b/ratelimitstore_test.go @@ -0,0 +1,306 @@ +package desync + +import ( + "context" + "crypto/rand" + "fmt" + "net/http/httptest" + "net/url" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func NewTestRateLimitedLocalStore(t *testing.T, eventRate float64, burstRate int) *RateLimitedStore { + + src_store_dir := t.TempDir() + src_store, err := NewLocalStore(src_store_dir, StoreOptions{}) + require.NoError(t, err) + + throttleOptions := ThrottleOptions{eventRate, burstRate} + store := NewRateLimitedStore(src_store, throttleOptions) + require.Equal(t, store.options.burstRate, burstRate) + return store + +} + +func random_data() ([]byte, error) { + + b := make([]byte, 16) + _, err := rand.Read(b) + if err != nil { + fmt.Println("Error: ", err) + return b, err + } + + return b, nil +} + +func makeChunk(t *testing.T) *Chunk { + data, err := random_data() + require.NoError(t, err) + chunk := NewChunk(data) + return chunk +} + +func storeLoop(t *testing.T, max int, chunk_ids []ChunkID, store RateLimitedStore) { + + for i := 0; i < max; i++ { + + chunk := makeChunk(t) + chunk_ids[i] = chunk.ID() + err := store.StoreChunk(chunk) + require.Nil(t, err) + } + +} + +func chunkCheck(t *testing.T, chunk_ids []ChunkID, store RateLimitedStore) { + for i := 0; i < len(chunk_ids); i++ { + + has, err := store.HasChunk(chunk_ids[i]) + require.Nil(t, err) + require.True(t, has) + + } +} + +func TestLimiter(t *testing.T) { + + // This is testing the framework. So not great, but I needed to get my head around what the relation + // between events and burst do + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000) + defer cancel() + + limiter := rate.NewLimiter(rate.Limit(50), 50) + err := limiter.WaitN(ctx, 1) + require.Nil(t, err) + + limiter = rate.NewLimiter(rate.Limit(2), 1) + _ = limiter.WaitN(ctx, 1) + require.Nil(t, err) + err = limiter.WaitN(ctx, 1) + require.Nil(t, err) + + limiter = rate.NewLimiter(rate.Limit(1), 1) + _ = limiter.WaitN(ctx, 1) + require.Nil(t, err) + err = limiter.WaitN(ctx, 1) + require.NotNil(t, err) + +} + +func TestCopyWithNoLimit(t *testing.T) { + + throttledStore := NewTestRateLimitedLocalStore(t, 1, 1) + chunk_data := []byte("different datas") + chunk := NewChunk(chunk_data) + + start := time.Now() + // We start with 1 token in the bucket and replenish at 1 token per second + // This should take ~10 seconds. + // We test it takes 8s to guard against flakiness + for i := 0; i < 10; i++ { + err := throttledStore.StoreChunk(chunk) + // This test will eventually fail when I get deadlines enabled + require.Nil(t, err) + } + finish := time.Now() + + require.True(t, finish.Sub(start).Seconds() > 8) +} + +func TestForAFullBucketNoWait(t *testing.T) { + + chunk_count := 10 + // Bucket has initial size chunk_count + throttledStore := NewTestRateLimitedLocalStore(t, 1, chunk_count+1) + + chunk_ids := make([]ChunkID, chunk_count) + start := time.Now() + // The bucket is full, we shouldn't have to wait + storeLoop(t, chunk_count, chunk_ids, *throttledStore) + finish := time.Now() + require.True(t, finish.Sub(start).Seconds() < 2) + chunkCheck(t, chunk_ids, *throttledStore) +} + +func TestForAFastReplenishmentRateLittleWait(t *testing.T) { + + chunk_count := 10 + // Bucket only has one token, but we replenish chunk_count tokens every second + throttledStore := NewTestRateLimitedLocalStore(t, float64(chunk_count+1), 1) + + start := time.Now() + + chunk_ids := make([]ChunkID, chunk_count) + storeLoop(t, chunk_count, chunk_ids, *throttledStore) + + finish := time.Now() + require.True(t, finish.Sub(start).Seconds() < 2) + chunkCheck(t, chunk_ids, *throttledStore) + +} + +func TestNoTimeout(t *testing.T) { + chunk_count := 10 + // Bucket only has one token, replenish 1 per second. Timeout is 11 seconds. + throttledStore := NewTestRateLimitedLocalStore(t, 1, 1) + + chunk_ids := make([]ChunkID, chunk_count) + storeLoop(t, chunk_count, chunk_ids, *throttledStore) +} + +func TestHasNoChunk(t *testing.T) { + + throttledStore := NewTestRateLimitedLocalStore(t, 2, 2) + chunk := makeChunk(t) + has, err := throttledStore.HasChunk(chunk.ID()) + require.Nil(t, err) + require.False(t, has) + +} + +func TestStoresAndHasChunk(t *testing.T) { + + throttledStore := NewTestRateLimitedLocalStore(t, 2, 2) + chunk := makeChunk(t) + err := throttledStore.StoreChunk(chunk) + require.Nil(t, err) + has, err := throttledStore.HasChunk(chunk.ID()) + require.Nil(t, err) + require.True(t, has) + +} + +func TestStoresAndHasChunkWithWaits(t *testing.T) { + + // Start with 1 token, replenish at 1 token per second. Consume 1 token per HasChunk, should take ~5s + throttledStore := NewTestRateLimitedLocalStore(t, float64(1), 1) + chunk := makeChunk(t) + start := time.Now() + err := throttledStore.StoreChunk(chunk) + require.Nil(t, err) + count := 0 + for count < 5 { + count++ + has, err := throttledStore.HasChunk(chunk.ID()) + require.Nil(t, err) + require.True(t, has) + } + finish := time.Now() + require.True(t, finish.Sub(start).Seconds() < 7) + require.True(t, finish.Sub(start).Seconds() > 3) + +} + +func TestHTTPHandlerReadWriteWithThrottle(t *testing.T) { + + tests := map[string]struct { + eventRate float64 + burstRate int + minTime float64 + maxTime float64 + ops int + readers int + }{ + + "full bucket": {10, 220, 0, 5, 100, 100}, + "Bucket with 1 tokens and a 0.2 t/s replenishment rate": {0.2, 1, 10, 20, 2, 2}, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + + upstream := NewTestRateLimitedLocalStore(t, test.eventRate, test.burstRate) + rw := httptest.NewServer(NewHTTPHandler(upstream, true, false, []converter{Compressor{}}, "")) + defer rw.Close() + + chunkIdChan := make(chan ChunkID, test.ops) + defer close(chunkIdChan) + + var wg sync.WaitGroup + rwStoreURL, _ := url.Parse(rw.URL) + start := time.Now() + for i := 0; i < test.readers; i++ { + + go func() { + defer wg.Done() + + rwStore, err := NewRemoteHTTPStore(rwStoreURL, StoreOptions{}) + require.NoError(t, err) + dataIn := []byte("some data") + chunkIn := NewChunk(dataIn) + err = rwStore.StoreChunk(chunkIn) + require.NoError(t, err) + chunkIdChan <- chunkIn.ID() + + }() + wg.Add(1) + go func() { + defer wg.Done() + rwStore, err := NewRemoteHTTPStore(rwStoreURL, StoreOptions{}) + require.NoError(t, err) + id := <-chunkIdChan + hasChunk, err := rwStore.HasChunk(id) + require.NoError(t, err) + require.True(t, hasChunk) + + }() + wg.Add(1) + } + + wg.Wait() + + finish := time.Now() + diff := finish.Sub(start).Seconds() + require.True(t, diff < test.maxTime) + require.True(t, diff > test.minTime) + }) + } + +} + +func TestWithCopy(t *testing.T) { + + tests := map[string]struct { + chunkCount int + eventRate float64 + burstRate int + minTime float64 + maxTime float64 + }{ + "empty-ish buckets 10 token per second": {10, 10, 1, 0, 5}, + "full buckets one token per second": {100, 1, 1000, 0, 5}, + "100 token buckets 500-hundred token per second": {1000, 500, 100, 6, 9}, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + + chunkCount := test.chunkCount + chunkIds := make([]ChunkID, chunkCount) + remote := NewTestRateLimitedLocalStore(t, test.eventRate, int(test.burstRate)) + local := NewTestRateLimitedLocalStore(t, test.eventRate, int(test.burstRate)) + ctxt := context.Background() + pb := NewProgressBar("") + start := time.Now() + // This will consume chunkCount ops + storeLoop(t, chunkCount, chunkIds, *remote) + + // This will consume chunkCount*4 tokens + err := Copy(ctxt, chunkIds, remote, local, 10, pb) + chunkCheck(t, chunkIds, *local) + require.Nil(t, err) + finish := time.Now() + diff := finish.Sub(start).Seconds() + require.True(t, diff < test.maxTime) + require.True(t, diff > test.minTime) + + }) + } + +}