Skip to content

Commit

Permalink
test(file): and retrieve handler test
Browse files Browse the repository at this point in the history
and test of retrieve handler in both Filecoin and non-filecoin variants and correct mistakes
  • Loading branch information
hannahhoward committed Sep 18, 2023
1 parent ee8cf7f commit 0e6617e
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 2 deletions.
5 changes: 3 additions & 2 deletions handler/file/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ func (r *filecoinReader) Read(p []byte) (int, error) {
return read, UnableToServeRangeError{Start: r.offset, End: fileRange.Offset, Err: ErrNoFileRangeRecord}
}
rangeReadLen := readLen
remainingRange := (fileRange.Offset + fileRange.Length) - r.offset
offsetInRange := r.offset - fileRange.Offset
remainingRange := fileRange.Length - offsetInRange
if rangeReadLen > remainingRange {
rangeReadLen = remainingRange
}
Expand All @@ -141,7 +142,7 @@ func (r *filecoinReader) Read(p []byte) (int, error) {
if err != nil || len(providers) == 0 {
return read, UnableToServeRangeError{Start: r.offset, End: r.offset + rangeReadLen, Err: ErrNoFilecoinDeals}
}
err = r.retriever.Retrieve(r.ctx, cid.Cid(fileRange.CID), r.offset, r.offset+rangeReadLen, providers, buf)
err = r.retriever.Retrieve(r.ctx, cid.Cid(fileRange.CID), offsetInRange, offsetInRange+rangeReadLen, providers, buf)
if err != nil {
return read, UnableToServeRangeError{
Start: r.offset,
Expand Down
244 changes: 244 additions & 0 deletions handler/file/retrieve_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package file

import (
"bytes"
"context"
"crypto/rand"
"io"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/data-preservation-programs/singularity/model"
"github.com/data-preservation-programs/singularity/util/testutil"
"github.com/gotidy/ptr"
util "github.com/ipfs/boxo/util"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-unixfsnode/file"
ufstestutil "github.com/ipfs/go-unixfsnode/testutil"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/memstore"
"github.com/stretchr/testify/require"
"gorm.io/gorm"
)

type testRange struct {
expectedBytes []byte
file ufstestutil.DirEntry
}

func TestRetrieveFileHandler(t *testing.T) {
testCases := []struct {
name string
keepLocalFile bool
}{
{
name: "from available local file",
keepLocalFile: true,
},
{
name: "via filecoin retriever",
keepLocalFile: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {

testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {

path := t.TempDir()
lsys := cidlink.DefaultLinkSystem()
memSys := memstore.Store{
Bag: make(map[string][]byte),
}
lsys.SetReadStorage(&memSys)
lsys.SetWriteStorage(&memSys)
lsys.TrustedStorage = true
ranges := make([]testRange, 0, 4)
for i := 0; i < 4; i++ {
expectedByteRange := make([]byte, 1<<20)
expectedBytesWriter := bytes.NewBuffer(expectedByteRange)
expectedBytesWriter.Reset()
fileReader := io.TeeReader(rand.Reader, expectedBytesWriter)
file := ufstestutil.GenerateFile(t, &lsys, fileReader, 1<<20)
ranges = append(ranges, testRange{expectedByteRange, file})
}

name := "deletedFile.txt"
if testCase.keepLocalFile {
osFile, err := os.CreateTemp(path, "push-*")
require.NoError(t, err)
for _, testRange := range ranges {
_, err := osFile.Write(testRange.expectedBytes)
require.NoError(t, err)
}
name = filepath.Base(osFile.Name())
err = osFile.Close()
require.NoError(t, err)
}
file := model.File{
Path: name,
Size: 4 << 20,
Attachment: &model.SourceAttachment{
Preparation: &model.Preparation{
Name: "prep",
},
Storage: &model.Storage{
Name: "source",
Type: "local",
Path: path,
},
},
}
err := db.Create(&file).Error
require.NoError(t, err)
jobs := make([]model.Job, 2)
for i := 0; i < 2; i++ {
job := model.Job{
AttachmentID: file.Attachment.ID,
}
err = db.Create(&job).Error
require.NoError(t, err)
jobs[i] = job
}

for i, testRange := range ranges {
fileRange := model.FileRange{
FileID: file.ID,
CID: model.CID(testRange.file.Root),
Offset: int64(i) * (1 << 20),
Length: 1 << 20,
JobID: ptr.Of(jobs[i/2].ID),
}
err = db.Create(&fileRange).Error
require.NoError(t, err)
}

testCids := make([]cid.Cid, 0, 2)
for i := 0; i < 2; i++ {
testCids = append(testCids, cid.NewCidV1(cid.Raw, util.Hash([]byte("test"+strconv.Itoa(i)))))
}

for i, job := range jobs {
car := model.Car{
JobID: ptr.Of(job.ID),
PieceCID: model.CID(testCids[i]),
PreparationID: file.Attachment.PreparationID,
}
err = db.Create(&car).Error
require.NoError(t, err)
}

deals := make([]model.Deal, 0, 4)
for i, testCid := range testCids {

deal := model.Deal{
State: model.DealActive,
PieceCID: model.CID(testCid),
Provider: "apples" + strconv.Itoa(i),
Wallet: &model.Wallet{},
}
err = db.Create(&deal).Error
require.NoError(t, err)

deals = append(deals, deal)
state := model.DealPublished
if i > 0 {
state = model.DealProposed
}
deal = model.Deal{
State: state,
PieceCID: model.CID(testCid),
Provider: "oranges" + strconv.Itoa(i),
Wallet: &model.Wallet{},
}
err = db.Create(&deal).Error
require.NoError(t, err)
deals = append(deals, deal)
}
fr := &fakeRetriever{
lsys: &lsys,
}
seeker, _, _, err := Default.RetrieveFileHandler(ctx, db, fr, uint64(file.ID))
require.NoError(t, err)
_, err = seeker.Seek(1<<19, io.SeekStart)
require.NoError(t, err)
outBuf := make([]byte, 1<<20)
_, err = seeker.Read(outBuf)
require.NoError(t, err)
require.Equal(t, outBuf, bytes.Join([][]byte{ranges[0].expectedBytes[1<<19 : 1<<20], ranges[1].expectedBytes[0 : 1<<19]}, nil))
if !testCase.keepLocalFile {
require.Len(t, fr.requests, 2)
require.Equal(t, fr.requests[0], retrieveRequest{ranges[0].file.Root, 1 << 19, 1 << 20, []string{deals[0].Provider, deals[1].Provider}})
require.Equal(t, fr.requests[1], retrieveRequest{ranges[1].file.Root, 0, 1 << 19, []string{deals[0].Provider, deals[1].Provider}})
fr.requests = nil
}
_, err = seeker.Read(outBuf)
require.NoError(t, err)
require.Equal(t, outBuf, bytes.Join([][]byte{ranges[1].expectedBytes[1<<19 : 1<<20], ranges[2].expectedBytes[0 : 1<<19]}, nil))
if !testCase.keepLocalFile {
require.Len(t, fr.requests, 2)
require.Equal(t, fr.requests[0], retrieveRequest{ranges[1].file.Root, 1 << 19, 1 << 20, []string{deals[0].Provider, deals[1].Provider}})
require.Equal(t, fr.requests[1], retrieveRequest{ranges[2].file.Root, 0, 1 << 19, []string{deals[2].Provider}})
fr.requests = nil
}
_, err = seeker.Read(outBuf)
require.NoError(t, err)
require.Equal(t, outBuf, bytes.Join([][]byte{ranges[2].expectedBytes[1<<19 : 1<<20], ranges[3].expectedBytes[0 : 1<<19]}, nil))
if !testCase.keepLocalFile {
require.Len(t, fr.requests, 2)
require.Equal(t, fr.requests[0], retrieveRequest{ranges[2].file.Root, 1 << 19, 1 << 20, []string{deals[2].Provider}})
require.Equal(t, fr.requests[1], retrieveRequest{ranges[3].file.Root, 0, 1 << 19, []string{deals[2].Provider}})
fr.requests = nil
}
n, err := seeker.Read(outBuf)
require.NoError(t, err)
require.Equal(t, n, 1<<19)
require.Equal(t, outBuf[:n], ranges[3].expectedBytes[1<<19:1<<20])
if !testCase.keepLocalFile {
require.Len(t, fr.requests, 1)
require.Equal(t, fr.requests[0], retrieveRequest{ranges[3].file.Root, 1 << 19, 1 << 20, []string{deals[2].Provider}})
fr.requests = nil
}
})
})
}
}

type retrieveRequest struct {
c cid.Cid
rangeStart int64
rangeEnd int64
sps []string
}

type fakeRetriever struct {
requests []retrieveRequest
lsys *linking.LinkSystem
}

func (fr *fakeRetriever) Retrieve(ctx context.Context, c cid.Cid, rangeStart int64, rangeEnd int64, sps []string, out io.Writer) error {
fr.requests = append(fr.requests, retrieveRequest{c, rangeStart, rangeEnd, sps})
node, err := fr.lsys.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, dagpb.Type.PBNode)
if err != nil {
return err
}
fnode, err := file.NewUnixFSFile(ctx, node, fr.lsys)
if err != nil {
return err
}
nlr, err := fnode.AsLargeBytes()
if err != nil {
return err
}
_, err = nlr.Seek(rangeStart, io.SeekStart)
if err != nil {
return err
}
rangeLeftReader := io.LimitReader(nlr, rangeEnd-rangeStart)
_, err = io.Copy(out, rangeLeftReader)
return err
}

0 comments on commit 0e6617e

Please sign in to comment.