Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up VerifyIndex #206

Merged
merged 3 commits into from
Dec 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions cmd/desync/verifyindex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"bytes"
"context"
"github.com/stretchr/testify/require"
"testing"
)

func TestVerifyIndexCommand(t *testing.T) {
// Validate the index of blob1, we expect it to complete without any error
verifyIndex := newVerifyIndexCommand(context.Background())
verifyIndex.SetArgs([]string{"testdata/blob1.caibx", "testdata/blob1"})
b := new(bytes.Buffer)
stderr = b
_, err := verifyIndex.ExecuteC()
require.NoError(t, err)
require.Contains(t, b.String(), "")

// Do the same for blob2
verifyIndex = newVerifyIndexCommand(context.Background())
verifyIndex.SetArgs([]string{"testdata/blob2.caibx", "testdata/blob2"})
b = new(bytes.Buffer)
stderr = b
_, err = verifyIndex.ExecuteC()
require.NoError(t, err)
require.Contains(t, b.String(), "")

// Run again against the wrong blob
verifyIndex = newVerifyIndexCommand(context.Background())
verifyIndex.SetArgs([]string{"testdata/blob2.caibx", "testdata/blob1"})
_, err = verifyIndex.ExecuteC()
require.Error(t, err)
}
47 changes: 24 additions & 23 deletions verifyindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package desync
import (
"context"
"fmt"
"io"
"os"

"golang.org/x/sync/errgroup"
Expand All @@ -12,7 +11,7 @@ import (
// VerifyIndex re-calculates the checksums of a blob comparing it to a given index.
// Fails if the index does not match the blob.
func VerifyIndex(ctx context.Context, name string, idx Index, n int, pb ProgressBar) error {
in := make(chan IndexChunk)
in := make(chan []IndexChunk)
g, ctx := errgroup.WithContext(ctx)

// Setup and start the progressbar if any
Expand All @@ -39,41 +38,43 @@ func VerifyIndex(ctx context.Context, name string, idx Index, n int, pb Progress
defer f.Close()
g.Go(func() error {
for c := range in {
// Update progress bar if any
if pb != nil {
pb.Increment()
}

// Position the filehandle to the place where the chunk is meant to come
// from within the file
if _, err := f.Seek(int64(c.Start), io.SeekStart); err != nil {
return err
}

// Read the whole (uncompressed) chunk into memory
b := make([]byte, c.Size)
if _, err := io.ReadFull(f, b); err != nil {
// Reuse the fileSeedSegment structure, this is really just a seed segment after all
segment := newFileSeedSegment(name, c, false, false)
if err := segment.validate(f); err != nil {
return err
}

// Calculate this chunks checksum and compare to what it's supposed to be
// according to the index
sum := Digest.Sum(b)
if sum != c.ID {
return fmt.Errorf("checksum does not match chunk %s", c.ID)
// Update progress bar, if any
if pb != nil {
pb.Add(len(c))
}
}
return nil
})
}

chunksNum := len(idx.Chunks)

// Number of chunks that will be evaluated in a single Goroutine.
// This helps to reduce the required number of context switch.
// In theory, we could just divide the total number of chunks by the number
// of workers, but instead we reduce that by 10 times to avoid the situation
// where we end up waiting a single worker that was slower to complete (e.g.
// if its chunks were not in cache while the others were).
batch := chunksNum / (n * 10)

// Feed the workers, stop if there are any errors
loop:
for _, c := range idx.Chunks {
for i := 0; i < chunksNum; i = i + batch + 1 {
last := i + batch
if last >= chunksNum {
// We reached the end of the array
last = chunksNum - 1
}
select {
case <-ctx.Done():
break loop
case in <- c:
case in <- idx.Chunks[i : last+1]:
}
}
close(in)
Expand Down