-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lassie fetching for retrieval #325
Conversation
|
||
// getContent fetches content through Lassie and writes the CAR file to an output writer | ||
func (r *Retriever) getContent(ctx context.Context, c cid.Cid, rangeStart int64, rangeEnd int64, sps []string, carOutput io.Writer) error { | ||
writable, err := storage.NewWritable(carOutput, []cid.Cid{c}, car.WriteAsCarV1(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the round-trip through a CARv1 seems unfortunate, serializing and deserializing when you have the blocks is maybe a tad inefficient? Perhaps you should make this accept a storage.WritableStorage
, and the "deserializer" accept a storage.ReadableStorage
and your pipe is just a custom storage implementation that takes blocks from the write side, queues them up and puts them out on the read side, checking the expected read CID matches the queued CID and erroring if not. You could also apply the backpressure there too, preventing writes until you get a read, or however much buffering you want to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did something kind of similar here https://github.com/ipld/go-car/blob/master/cmd/car/extract.go#L380C6-L380C22 but it's got to account for the no-dupes case and it is decoding a CAR. But the point is that it's providing a ReadableStorage
to the unixfs reifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see what you mean, skip carv1 streaming and jsut make a block pipe? interesting!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
storage.Pipe seems like an interesting concept thought I wonder if it belongs in go-ipld-prime :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the other thing is that the fact that it's length encoded as a car is what makes using the io.Pipe possible.
00ca1cc
to
a18c41f
Compare
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## main #325 +/- ##
==========================================
- Coverage 74.33% 73.88% -0.45%
==========================================
Files 137 140 +3
Lines 8793 9096 +303
==========================================
+ Hits 6536 6721 +185
- Misses 1589 1674 +85
- Partials 668 701 +33
☔ View full report in Codecov by Sentry. |
d783522
to
0e6617e
Compare
err := db.Table("deals").Select("provider"). | ||
Joins("JOIN cars ON deals.piece_cid = cars.piece_cid"). | ||
Where("cars.job_id = ? and deals.state IN (?)", jobID, []model.DealState{ | ||
model.DealPublished, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you trying to see if the miners who have published the deal might have sealed it before the tracking gets updated?
Do you want to add DealExpired?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
data is generally retrievable from boost as soon as the deal is published. the sealing just creates the sealed copy.
handler/file/retrieve.go
Outdated
|
||
func findProviders(db *gorm.DB, jobID model.JobID) ([]string, error) { | ||
var deals []deal | ||
err := db.Table("deals").Select("provider"). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to add distinct
here?
readLen = remainingBytes | ||
} | ||
|
||
fileRanges, err := findFileRanges(r.db, r.id, r.offset, r.offset+readLen) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has some room to optimize to reduce db and network requests. Even during sequential read, the length of the buffer p
increases gradually so you may end up calling findFileRanges
and findProviders
multiple times.
How about finding all file ranges during the first time when read() is called so we can reduce the database load.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And probably providers too.
return err | ||
} | ||
request.Duplicates = true | ||
request.Protocols = []multicodec.Code{multicodec.TransportIpfsGatewayHttp} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you evaluated usinig piece gateway, because each fileRange is stored sequentially in the file, so you can get the CarBlocks of a fileRange, and stream that fileRange with a single HTTP request with ranges header while validating the content while streaming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, we should make it configurable so the user can choose different ways of retrieval based on some header value, i.e. x-trasnport-protocol
adds a utility to fetch unixfs files from sps
implements the full read seeker for filecoin and wires everything up
add unit test and add various comments
and test of retrieve handler in both Filecoin and non-filecoin variants and correct mistakes
435f602
to
4407193
Compare
Goals
support retrieval for data that is no longer available in the source storage
Implementation
Core implementation pieces:
For Discussion
Everything is unit tested. It's relatively difficult to implement a full integration test without an SP / mock SP to test with, so the plan currently is to integration test through motion. In the future, we might consider implementing a full integration test using https://github.com/ipld/frisbii/
There's a significant optimization still to be done. Currently, we do reads from remotes based of the size of the buffer passed to Read on the io.ReadSeeker implementation. This is probably about 2k. For longer reads, it would make sense to fetch full FileRange objects. At the same time, we probably need to know the bounds of the HTTP request ahead of time so we no how far ahead we can read without waiting network bandwidth for the SPs. I didn't take this on here cause the ticket is already large.