Skip to content

Commit

Permalink
Add option to discard invalid seeds (#203)
Browse files Browse the repository at this point in the history
If we have a seed index for a file/device that is corrupted we may want
to try harder and continue anyway by discarding the invalid seed and
fallback to the potentially other seeds and/or by just using the store.

This has been added with a new option "--skip-invalid-seeds".

In order to not reduce the installation speed, the process of choosing
and validating the chunks has been refactored, introducing a new concept
called "plan".
We create a plan about where to look for the chunks ahead of time,
before starting to actually write in the destination.

Signed-off-by: Ludovico de Nittis <ludovico.denittis@collabora.com>
  • Loading branch information
RyuzakiKK authored Dec 30, 2021
1 parent 0bac841 commit a4c6fd2
Show file tree
Hide file tree
Showing 14 changed files with 298 additions and 100 deletions.
79 changes: 66 additions & 13 deletions assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,25 @@ package desync
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"os"
)

"golang.org/x/sync/errgroup"
// InvalidSeedAction represent the action that we will take if a seed
// happens to be invalid. There are currently two options: either fail with
// an error or skip the invalid seed and try to continue.
type InvalidSeedAction int

const (
InvalidSeedActionBailOut InvalidSeedAction = iota
InvalidSeedActionSkip
)

type AssembleOptions struct {
N int
InvalidSeedAction InvalidSeedAction
}

// AssembleFile re-assembles a file based on a list of index chunks. It runs n
// goroutines, creating one filehandle for the file "name" per goroutine
// and writes to the file simultaneously. If progress is provided, it'll be
Expand All @@ -16,7 +30,7 @@ import (
// confirm if the data matches what is expected and only populate areas that
// differ from the expected content. This can be used to complete partly
// written files.
func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []Seed, n int, pb ProgressBar) (*ExtractStats, error) {
func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []Seed, options AssembleOptions, pb ProgressBar) (*ExtractStats, error) {
type Job struct {
segment IndexSegment
source SeedSegment
Expand Down Expand Up @@ -78,21 +92,22 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
return stats, err
}
defer ns.close()
seeds = append([]Seed{ns}, seeds...)

// Start a self-seed which will become usable once chunks are written contigously
// beginning at position 0.
// beginning at position 0. There is no need to add this to the seeds list because
// when we create a plan it will be empty.
ss, err := newSelfSeed(name, idx)
if err != nil {
return stats, err
}
seeds = append([]Seed{ns, ss}, seeds...)

// Record the total number of seeds and blocksize in the stats
stats.Seeds = len(seeds)
stats.Blocksize = blocksize

// Start the workers, each having its own filehandle to write concurrently
for i := 0; i < n; i++ {
for i := 0; i < options.N; i++ {
f, err := os.OpenFile(name, os.O_RDWR, 0666)
if err != nil {
return stats, fmt.Errorf("unable to open file %s, %s", name, err)
Expand All @@ -104,6 +119,8 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
pb.Add(job.segment.lengthChunks())
}
if job.source != nil {
// If we have a seedSegment we expect 1 or more chunks between
// the start and the end of this segment.
stats.addChunksFromSeed(uint64(job.segment.lengthChunks()))
offset := job.segment.start()
length := job.segment.lengthBytes()
Expand All @@ -118,7 +135,30 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
ss.add(job.segment)
continue
}

// If we don't have a seedSegment we expect an IndexSegment with just
// a single chunk, that we can take from either the selfSeed, from the
// destination file, or from the store.
if len(job.segment.chunks()) != 1 {
panic("Received an unexpected segment that doesn't contain just a single chunk")
}
c := job.segment.chunks()[0]

// If we already took this chunk from the store we can reuse it by looking
// into the selfSeed.
if segment := ss.getChunk(c.ID); segment != nil {
copied, cloned, err := segment.WriteInto(f, c.Start, c.Size, blocksize, isBlank)
if err != nil {
return err
}
stats.addBytesCopied(copied)
stats.addBytesCloned(cloned)
// Even if we already confirmed that this chunk is present in the
// self-seed, we still need to record it as being written, otherwise
// the self-seed position pointer doesn't advance as we expect.
ss.add(job.segment)
}

// If we operate on an existing file there's a good chance we already
// have the data written for this chunk. Let's read it from disk and
// compare to what is expected.
Expand Down Expand Up @@ -162,19 +202,32 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
})
}

// Let the sequencer break up the index into segments, feed the workers, and
// stop if there are any errors
// Let the sequencer break up the index into segments, create and validate a plan,
// feed the workers, and stop if there are any errors
seq := NewSeedSequencer(idx, seeds...)
loop:
plan := seq.Plan()
for {
chunks, from, done := seq.Next()
if err := plan.Validate(ctx, options.N); err != nil {
// This plan has at least one invalid seed
if options.InvalidSeedAction == InvalidSeedActionBailOut {
return stats, err
}
// Skip the invalid seed and try again
Log.WithError(err).Info("Unable to use one of the chosen seeds, skipping it")
seq.Rewind()
plan = seq.Plan()
continue
}
// Found a valid plan
break
}

loop:
for _, segment := range plan {
select {
case <-ctx.Done():
break loop
case in <- Job{chunks, from}:
}
if done {
break
case in <- Job{segment.indexSegment, segment.source}:
}
}
close(in)
Expand Down
8 changes: 6 additions & 2 deletions assemble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ func TestExtract(t *testing.T) {
for name, test := range tests {
t.Run(name, func(t *testing.T) {
defer os.Remove(test.outfile)
if _, err := AssembleFile(context.Background(), test.outfile, index, test.store, nil, 10, nil); err != nil {
if _, err := AssembleFile(context.Background(), test.outfile, index, test.store, nil,
AssembleOptions{10, InvalidSeedActionBailOut}, nil,
); err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadFile(test.outfile)
Expand Down Expand Up @@ -268,7 +270,9 @@ func TestSeed(t *testing.T) {
seeds = append(seeds, seed)
}

if _, err := AssembleFile(context.Background(), dst.Name(), dstIndex, s, seeds, 10, nil); err != nil {
if _, err := AssembleFile(context.Background(), dst.Name(), dstIndex, s, seeds,
AssembleOptions{10, InvalidSeedActionBailOut}, nil,
); err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadFile(dst.Name())
Expand Down
33 changes: 21 additions & 12 deletions cmd/desync/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (

type extractOptions struct {
cmdStoreOptions
stores []string
cache string
seeds []string
seedDirs []string
inPlace bool
printStats bool
stores []string
cache string
seeds []string
seedDirs []string
inPlace bool
printStats bool
skipInvalidSeeds bool
}

func newExtractCommand(ctx context.Context) *cobra.Command {
Expand Down Expand Up @@ -50,6 +51,7 @@ the index from STDIN.`,
flags.StringSliceVarP(&opt.stores, "store", "s", nil, "source store(s)")
flags.StringSliceVar(&opt.seeds, "seed", nil, "seed indexes")
flags.StringSliceVar(&opt.seedDirs, "seed-dir", nil, "directory with seed index files")
flags.BoolVar(&opt.skipInvalidSeeds, "skip-invalid-seeds", false, "Skip seeds with invalid chunks")
flags.StringVarP(&opt.cache, "cache", "c", "", "store to be used as cache")
flags.BoolVarP(&opt.inPlace, "in-place", "k", false, "extract the file in place and keep it in case of error")
flags.BoolVarP(&opt.printStats, "print-stats", "", false, "print statistics")
Expand Down Expand Up @@ -100,11 +102,18 @@ func runExtract(ctx context.Context, opt extractOptions, args []string) error {
}
seeds = append(seeds, dSeeds...)

// By default, bail out if we encounter an invalid seed
invalidSeedAction := desync.InvalidSeedActionBailOut
if opt.skipInvalidSeeds {
invalidSeedAction = desync.InvalidSeedActionSkip
}
assembleOpt := desync.AssembleOptions{N: opt.n, InvalidSeedAction: invalidSeedAction}

var stats *desync.ExtractStats
if opt.inPlace {
stats, err = writeInplace(ctx, outFile, idx, s, seeds, opt.n)
stats, err = writeInplace(ctx, outFile, idx, s, seeds, assembleOpt)
} else {
stats, err = writeWithTmpFile(ctx, outFile, idx, s, seeds, opt.n)
stats, err = writeWithTmpFile(ctx, outFile, idx, s, seeds, assembleOpt)
}
if err != nil {
return err
Expand All @@ -115,7 +124,7 @@ func runExtract(ctx context.Context, opt extractOptions, args []string) error {
return nil
}

func writeWithTmpFile(ctx context.Context, name string, idx desync.Index, s desync.Store, seeds []desync.Seed, n int) (*desync.ExtractStats, error) {
func writeWithTmpFile(ctx context.Context, name string, idx desync.Index, s desync.Store, seeds []desync.Seed, assembleOpt desync.AssembleOptions) (*desync.ExtractStats, error) {
// Prepare a tempfile that'll hold the output during processing. Close it, we
// just need the name here since it'll be opened multiple times during write.
// Also make sure it gets removed regardless of any errors below.
Expand All @@ -128,19 +137,19 @@ func writeWithTmpFile(ctx context.Context, name string, idx desync.Index, s desy
defer os.Remove(tmp.Name())

// Build the blob from the chunks, writing everything into the tempfile
if stats, err = writeInplace(ctx, tmp.Name(), idx, s, seeds, n); err != nil {
if stats, err = writeInplace(ctx, tmp.Name(), idx, s, seeds, assembleOpt); err != nil {
return stats, err
}

// Rename the tempfile to the output file
return stats, os.Rename(tmp.Name(), name)
}

func writeInplace(ctx context.Context, name string, idx desync.Index, s desync.Store, seeds []desync.Seed, n int) (*desync.ExtractStats, error) {
func writeInplace(ctx context.Context, name string, idx desync.Index, s desync.Store, seeds []desync.Seed, assembleOpt desync.AssembleOptions) (*desync.ExtractStats, error) {
pb := NewProgressBar("")

// Build the blob from the chunks, writing everything into given filename
return desync.AssembleFile(ctx, name, idx, s, seeds, n, pb)
return desync.AssembleFile(ctx, name, idx, s, seeds, assembleOpt, pb)
}

func readSeeds(dstFile string, locations []string, opts cmdStoreOptions) ([]desync.Seed, error) {
Expand Down
44 changes: 43 additions & 1 deletion cmd/desync/extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,26 @@ func TestExtractCommand(t *testing.T) {
{"extract with multi seed",
[]string{"-s", "testdata/blob1.store", "--seed", "testdata/blob2.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1},
{"extract with seed directory",
[]string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "testdata/blob1.caibx"}, out1},
[]string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out1},
{"extract with cache",
[]string{"-s", "testdata/blob1.store", "-c", cacheDir, "testdata/blob1.caibx"}, out1},
{"extract with multiple stores",
[]string{"-s", "testdata/blob2.store", "-s", "testdata/blob1.store", "testdata/blob1.caibx"}, out1},
{"extract with multiple stores and cache",
[]string{"-n", "1", "-s", "testdata/blob2.store", "-s", "testdata/blob1.store", "--cache", cacheDir, "testdata/blob1.caibx"}, out1},
{"extract with corrupted seed",
[]string{"--store", "testdata/blob1.store", "--seed", "testdata/blob2_corrupted.caibx", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out1},
{"extract with multiple corrupted seeds",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out1},
// Here we don't need the `--skip-invalid-seeds` because we expect the blob1 seed to always be the chosen one, being
// a 1:1 match with the index that we want to write. So we never reach the point where we validate the corrupted seed.
{"extract with seed directory without skipping invalid seeds",
[]string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "testdata/blob1.caibx"}, out1},
// Same as above, no need for `--skip-invalid-seeds`
{"extract with multiple corrupted seeds",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1},
{"extract with single seed that has all the expected chunks",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1},
} {
t.Run(test.name, func(t *testing.T) {
cmd := newExtractCommand(context.Background())
Expand Down Expand Up @@ -96,3 +109,32 @@ func TestExtractWithFailover(t *testing.T) {
_, err = cmd.ExecuteC()
require.NoError(t, err)
}

func TestExtractWithInvalidSeeds(t *testing.T) {
outDir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(outDir)
out := filepath.Join(outDir, "out")

for _, test := range []struct {
name string
args []string
output string
}{
{"extract with corrupted seed",
[]string{"--store", "testdata/blob1.store", "--seed", "testdata/blob2_corrupted.caibx", "testdata/blob1.caibx"}, out},
{"extract with multiple corrupted seeds",
[]string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob2.caibx"}, out},
} {
t.Run(test.name, func(t *testing.T) {
cmd := newExtractCommand(context.Background())
cmd.SetArgs(append(test.args, test.output))

// Redirect the command's output and run it
stderr = ioutil.Discard
cmd.SetOutput(ioutil.Discard)
_, err := cmd.ExecuteC()
require.Error(t, err)
})
}
}
Binary file added cmd/desync/testdata/blob2_corrupted
Binary file not shown.
1 change: 1 addition & 0 deletions cmd/desync/testdata/blob2_corrupted.caibx
Empty file.
Loading

0 comments on commit a4c6fd2

Please sign in to comment.