-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DTM sends data over graphsync for validated push requests #665
Changes from 7 commits
5512f1d
cdefdf0
6f778d5
f85aba5
488473a
f034ddd
9826970
c0181ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,12 @@ | ||
package graphsyncimpl | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"reflect" | ||
"sync" | ||
|
||
"github.com/ipfs/go-cid" | ||
"github.com/ipfs/go-graphsync" | ||
|
@@ -43,6 +46,7 @@ type graphsyncImpl struct { | |
dataTransferNetwork network.DataTransferNetwork | ||
subscribers []datatransfer.Subscriber | ||
validatedTypes map[string]validateType | ||
channelsLk sync.RWMutex | ||
channels map[datatransfer.ChannelID]datatransfer.ChannelState | ||
gs graphsync.GraphExchange | ||
peerID peer.ID | ||
|
@@ -56,16 +60,75 @@ func NewGraphSyncDataTransfer(parent context.Context, host host.Host, gs graphsy | |
dataTransferNetwork, | ||
nil, | ||
make(map[string]validateType), | ||
sync.RWMutex{}, | ||
make(map[datatransfer.ChannelID]datatransfer.ChannelState), | ||
gs, | ||
host.ID(), | ||
0, | ||
} | ||
if err := gs.RegisterRequestReceivedHook(true, impl.GsReqRecdHook); err != nil { | ||
log.Error(err) | ||
return nil | ||
} | ||
receiver := &graphsyncReceiver{parent, impl} | ||
dataTransferNetwork.SetDelegate(receiver) | ||
return impl | ||
} | ||
|
||
// GsReqRecdHook is a graphsync.OnRequestReceivedHook hook | ||
// if an incoming request does not match a previous push request, it returns an error. | ||
func (impl *graphsyncImpl) GsReqRecdHook(p peer.ID, request graphsync.RequestData) ([]graphsync.ExtensionData, error) { | ||
var resp []graphsync.ExtensionData | ||
chid, _, err := impl.getChannelIDAndData(request) | ||
|
||
extData := graphsync.ExtensionData{ | ||
Name: ExtensionDataTransfer, | ||
Data: nil, | ||
} | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the extension not being present is not an error -- there could be a graphsync request on the graphsync node that simply isn't for DT that is treated like a normal request (i.e. Blockchain sync). But this exposes an API issue with the GS hooks I see. I need to expose change the api two a triple: []graphsync.ExtensionData, didValidate, error -- anyway, not your problem for now. |
||
return resp, err | ||
} | ||
if !impl.HasPushChannel(chid) { | ||
return resp, errors.New("could not find push channel") | ||
} | ||
resp = append(resp, extData) | ||
return resp, nil | ||
} | ||
|
||
// gsExtended is a small interface used by getChannelIDAndData | ||
type gsExtended interface { | ||
Extension(name graphsync.ExtensionName) ([]byte, bool) | ||
} | ||
|
||
// getChannelIDAndData extracts extension data and creates a channel id then returns | ||
// both. Returns any errors. | ||
func (impl *graphsyncImpl) getChannelIDAndData(extendedData gsExtended) (datatransfer.ChannelID, *ExtensionDataTransferData, error) { | ||
data, ok := extendedData.Extension(ExtensionDataTransfer) | ||
if !ok { | ||
return datatransfer.ChannelID{}, nil, errors.New("extension not present") | ||
} | ||
unm, err := impl.unmarshalExtensionData(data) | ||
if err != nil { | ||
return datatransfer.ChannelID{}, nil, err | ||
} | ||
chid := datatransfer.ChannelID{ | ||
Initiator: impl.peerID, | ||
ID: datatransfer.TransferID(unm.TransferID), | ||
} | ||
return chid, unm, nil | ||
} | ||
|
||
// unmarshalExtensionData instatiates an extension data struct & unmarshals data into i | ||
func (impl *graphsyncImpl) unmarshalExtensionData(data []byte) (*ExtensionDataTransferData, error) { | ||
var extStruct ExtensionDataTransferData | ||
|
||
reader := bytes.NewReader(data) | ||
if err := extStruct.UnmarshalCBOR(reader); err != nil { | ||
return nil, err | ||
} | ||
return &extStruct, nil | ||
} | ||
|
||
// RegisterVoucherType registers a validator for the given voucher type | ||
// returns error if: | ||
// * voucher type does not implement voucher | ||
|
@@ -124,7 +187,9 @@ func (impl *graphsyncImpl) OpenPullDataChannel(ctx context.Context, requestTo pe | |
func (impl *graphsyncImpl) createNewChannel(tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) datatransfer.ChannelID { | ||
chid := datatransfer.ChannelID{Initiator: initiator, ID: tid} | ||
chst := datatransfer.ChannelState{Channel: datatransfer.NewChannel(0, baseCid, selector, voucher, dataSender, dataReceiver, 0)} | ||
impl.channelsLk.Lock() | ||
impl.channels[chid] = chst | ||
impl.channelsLk.Unlock() | ||
return chid | ||
} | ||
|
||
|
@@ -192,21 +257,49 @@ func (impl *graphsyncImpl) notifySubscribers(evt datatransfer.Event, cs datatran | |
|
||
// get all in progress transfers | ||
func (impl *graphsyncImpl) InProgressChannels() map[datatransfer.ChannelID]datatransfer.ChannelState { | ||
return impl.channels | ||
impl.channelsLk.RLock() | ||
defer impl.channelsLk.RUnlock() | ||
channelsCopy := make(map[datatransfer.ChannelID]datatransfer.ChannelState, len(impl.channels)) | ||
for channelID, channelState := range impl.channels { | ||
channelsCopy[channelID] = channelState | ||
} | ||
return channelsCopy | ||
} | ||
|
||
// HasPushChannel returns true if a channel with ID chid exists and is for a Push request. | ||
func (impl *graphsyncImpl) HasPushChannel(chid datatransfer.ChannelID) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similarly keep these functions private |
||
return impl.getPushChannel(chid) != datatransfer.EmptyChannelState | ||
} | ||
|
||
// HasPullChannel returns true if a channel with ID chid exists and is for a Pull request. | ||
func (impl *graphsyncImpl) HasPullChannel(chid datatransfer.ChannelID) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see above |
||
return impl.getPullChannel(chid) != datatransfer.EmptyChannelState | ||
} | ||
|
||
// getPullChannel searches for a pull-type channel in the slice of channels with id `chid`. | ||
// Returns datatransfer.EmptyChannelState if: | ||
// * there is no channel with that id | ||
// * it is not related to a pull request | ||
func (impl *graphsyncImpl) getPullChannel(chid datatransfer.ChannelID) datatransfer.ChannelState { | ||
impl.channelsLk.RLock() | ||
defer impl.channelsLk.RUnlock() | ||
channelState, ok := impl.channels[chid] | ||
if !ok || channelState.Sender() == impl.peerID { | ||
return datatransfer.EmptyChannelState | ||
} | ||
return channelState | ||
} | ||
|
||
func (impl *graphsyncImpl) getPushChannel(chid datatransfer.ChannelID) datatransfer.ChannelState { | ||
impl.channelsLk.RLock() | ||
defer impl.channelsLk.RUnlock() | ||
channelState, ok := impl.channels[chid] | ||
if !ok || channelState.Recipient() == impl.peerID { | ||
return datatransfer.EmptyChannelState | ||
} | ||
return channelState | ||
} | ||
|
||
// generateTransferID() generates a unique-to-runtime TransferID for use in creating | ||
// ChannelIDs | ||
func (impl *graphsyncImpl) generateTransferID() datatransfer.TransferID { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should not be a public function.
if it needs to be tested individually, it should be extracted to another module