Skip to content

Commit

Permalink
Refactor transport part 1 (#324)
Browse files Browse the repository at this point in the history
* refactor(transport): move libp2p message layer into transport

* refactor(graphsync): cleanup channel state tracking

* refactor(dtchannel): extract dtchannel to package

* test(impl): get remaining tests passing

* refactor(transport): much simpler interface

* style(lint): fix imports

* refactor(transport): remote unneccesary methods

* fix(rebase): fix errors after rebase
  • Loading branch information
hannahhoward committed Jun 16, 2022
1 parent c833220 commit 91ccb51
Show file tree
Hide file tree
Showing 30 changed files with 1,804 additions and 1,833 deletions.
4 changes: 2 additions & 2 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD

linkSystem := storeutil.LinkSystemForBlockstore(bstore)
gs := gsimpl.New(ctx, gsNet, linkSystem, gsimpl.RejectAllRequestsByDefault())
transport := gstransport.NewTransport(p, gs)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), dtNet, transport)
transport := gstransport.NewTransport(p, gs, dtNet)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), p, transport)
if err != nil {
return Instance{}, err
}
Expand Down
30 changes: 4 additions & 26 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type monitorAPI interface {
SubscribeToEvents(subscriber datatransfer.Subscriber) datatransfer.Unsubscribe
RestartDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error
CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error
ConnectTo(context.Context, peer.ID) error
PeerID() peer.ID
}

Expand Down Expand Up @@ -84,18 +83,8 @@ func checkConfig(cfg *Config) {
}
}

// AddPushChannel adds a push channel to the channel monitor
func (m *Monitor) AddPushChannel(chid datatransfer.ChannelID) *monitoredChannel {
return m.addChannel(chid, true)
}

// AddPullChannel adds a pull channel to the channel monitor
func (m *Monitor) AddPullChannel(chid datatransfer.ChannelID) *monitoredChannel {
return m.addChannel(chid, false)
}

// addChannel adds a channel to the channel monitor
func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitoredChannel {
// AddChannel adds a channel to the channel monitor
func (m *Monitor) AddChannel(chid datatransfer.ChannelID, isPull bool) *monitoredChannel {
if !m.enabled() {
return nil
}
Expand All @@ -106,7 +95,7 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitore
// Check if there is already a monitor for this channel
if _, ok := m.channels[chid]; ok {
tp := "push"
if !isPush {
if isPull {
tp = "pull"
}
log.Warnf("ignoring add %s channel %s: %s channel with that id already exists",
Expand Down Expand Up @@ -454,22 +443,11 @@ func (mc *monitoredChannel) doRestartChannel() error {
}

func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
// Establish a connection to the peer, in case the connection went down.
// Note that at the networking layer there is logic to retry if a network
// connection cannot be established, so this may take some time.
p := mc.chid.OtherParty(mc.mgr.PeerID())
log.Debugf("%s: re-establishing connection to %s", mc.chid, p)
start := time.Now()
err := mc.mgr.ConnectTo(mc.ctx, p)
if err != nil {
return xerrors.Errorf("%s: failed to reconnect to peer %s after %s: %w",
mc.chid, p, time.Since(start), err)
}
log.Debugf("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))

// Send a restart message for the channel
log.Debugf("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
err := mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
}
Expand Down
38 changes: 13 additions & 25 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
}
testCases := []testCase{{
name: "attempt restart",
}, {
name: "fail to reconnect to peer",
errReconnect: true,
}, {
name: "fail to send restart message",
errSendRestartMsg: true,
Expand All @@ -41,7 +38,7 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
for _, tc := range testCases {
t.Run(name+": "+tc.name, func(t *testing.T) {
ch := testutil.NewMockChannelState(testutil.MockChannelStateParams{ChannelID: ch1})
mockAPI := newMockMonitorAPI(ch, tc.errReconnect, tc.errSendRestartMsg)
mockAPI := newMockMonitorAPI(ch, tc.errSendRestartMsg)

triggerErrorEvent := func() {
if isPush {
Expand All @@ -59,9 +56,9 @@ func TestChannelMonitorAutoRestart(t *testing.T) {

var mch *monitoredChannel
if isPush {
mch = m.AddPushChannel(ch1)
mch = m.AddChannel(ch1, false)
} else {
mch = m.AddPullChannel(ch1)
mch = m.AddChannel(ch1, true)
}

// Simulate the responder sending Accept
Expand Down Expand Up @@ -115,7 +112,7 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
runTest := func(name string, isPush bool) {
t.Run(name, func(t *testing.T) {
ch := testutil.NewMockChannelState(testutil.MockChannelStateParams{ChannelID: ch1})
mockAPI := newMockMonitorAPI(ch, false, false)
mockAPI := newMockMonitorAPI(ch, false)

triggerErrorEvent := func() {
if isPush {
Expand All @@ -134,12 +131,12 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {

var mch *monitoredChannel
if isPush {
mch = m.AddPushChannel(ch1)
mch = m.AddChannel(ch1, false)

mockAPI.dataQueued(10)
mockAPI.dataSent(5)
} else {
mch = m.AddPullChannel(ch1)
mch = m.AddChannel(ch1, true)

mockAPI.dataReceived(5)
}
Expand Down Expand Up @@ -198,7 +195,7 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
runTest := func(name string, isPush bool) {
t.Run(name, func(t *testing.T) {
ch := testutil.NewMockChannelState(testutil.MockChannelStateParams{ChannelID: ch1})
mockAPI := newMockMonitorAPI(ch, false, false)
mockAPI := newMockMonitorAPI(ch, false)

triggerErrorEvent := func() {
if isPush {
Expand All @@ -216,12 +213,12 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
})

if isPush {
m.AddPushChannel(ch1)
m.AddChannel(ch1, false)

mockAPI.dataQueued(10)
mockAPI.dataSent(5)
} else {
m.AddPullChannel(ch1)
m.AddChannel(ch1, true)

mockAPI.dataReceived(5)
}
Expand Down Expand Up @@ -285,7 +282,7 @@ func TestChannelMonitorTimeouts(t *testing.T) {
for _, tc := range testCases {
t.Run(name+": "+tc.name, func(t *testing.T) {
ch := testutil.NewMockChannelState(testutil.MockChannelStateParams{ChannelID: ch1})
mockAPI := newMockMonitorAPI(ch, false, false)
mockAPI := newMockMonitorAPI(ch, false)

verifyClosedAndShutdown := func(chCtx context.Context, timeout time.Duration) {
mockAPI.verifyChannelClosed(t, true)
Expand All @@ -310,10 +307,10 @@ func TestChannelMonitorTimeouts(t *testing.T) {

var chCtx context.Context
if isPush {
mch := m.AddPushChannel(ch1)
mch := m.AddChannel(ch1, false)
chCtx = mch.ctx
} else {
mch := m.AddPullChannel(ch1)
mch := m.AddChannel(ch1, true)
chCtx = mch.ctx
}

Expand Down Expand Up @@ -370,7 +367,6 @@ func verifyChannelShutdown(t *testing.T, shutdownCtx context.Context) {

type mockMonitorAPI struct {
ch *testutil.MockChannelState
connectErrors bool
restartErrors bool
restartMessages chan struct{}
closeErr chan error
Expand All @@ -379,10 +375,9 @@ type mockMonitorAPI struct {
subscribers map[int]datatransfer.Subscriber
}

func newMockMonitorAPI(ch *testutil.MockChannelState, errOnReconnect, errOnRestart bool) *mockMonitorAPI {
func newMockMonitorAPI(ch *testutil.MockChannelState, errOnRestart bool) *mockMonitorAPI {
return &mockMonitorAPI{
ch: ch,
connectErrors: errOnReconnect,
restartErrors: errOnRestart,
restartMessages: make(chan struct{}, 100),
closeErr: make(chan error, 1),
Expand Down Expand Up @@ -414,13 +409,6 @@ func (m *mockMonitorAPI) fireEvent(e datatransfer.Event, state datatransfer.Chan
}
}

func (m *mockMonitorAPI) ConnectTo(ctx context.Context, id peer.ID) error {
if m.connectErrors {
return xerrors.Errorf("connect err")
}
return nil
}

func (m *mockMonitorAPI) PeerID() peer.ID {
return "p"
}
Expand Down
30 changes: 8 additions & 22 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,6 @@ import (

type Notifier func(datatransfer.Event, datatransfer.ChannelState)

// ErrNotFound is returned when a channel cannot be found with a given channel ID
type ErrNotFound struct {
ChannelID datatransfer.ChannelID
}

func (e *ErrNotFound) Error() string {
return "No channel for channel ID " + e.ChannelID.String()
}

func NewErrNotFound(chid datatransfer.ChannelID) error {
return &ErrNotFound{ChannelID: chid}
}

// ErrWrongType is returned when a caller attempts to change the type of implementation data after setting it
var ErrWrongType = errors.New("Cannot change type of implementation specific data after setting it")

Expand All @@ -50,8 +37,6 @@ type Channels struct {

// ChannelEnvironment -- just a proxy for DTNetwork for now
type ChannelEnvironment interface {
Protect(id peer.ID, tag string)
Unprotect(id peer.ID, tag string) bool
ID() peer.ID
CleanupChannel(chid datatransfer.ChannelID)
}
Expand Down Expand Up @@ -109,15 +94,15 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {

// CreateNew creates a new channel id and channel state and saves to channels.
// returns error if the channel exists already.
func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector datamodel.Node, voucher datatransfer.TypedVoucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector datamodel.Node, voucher datatransfer.TypedVoucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, datatransfer.Channel, error) {
var responder peer.ID
if dataSender == initiator {
responder = dataReceiver
} else {
responder = dataSender
}
chid := datatransfer.ChannelID{Initiator: initiator, Responder: responder, ID: tid}
err := c.stateMachines.Begin(chid, &internal.ChannelState{
channel := &internal.ChannelState{
SelfPeer: selfPeer,
TransferID: tid,
Initiator: initiator,
Expand All @@ -134,13 +119,14 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
},
},
Status: datatransfer.Requested,
})
}
err := c.stateMachines.Begin(chid, channel)
if err != nil {
log.Errorw("failed to create new tracking channel for data-transfer", "channelID", chid, "err", err)
return datatransfer.ChannelID{}, err
return datatransfer.ChannelID{}, nil, err
}
log.Debugw("created tracking channel for data-transfer, emitting channel Open event", "channelID", chid)
return chid, c.stateMachines.Send(chid, datatransfer.Open)
return chid, c.fromInternalChannelState(*channel), c.stateMachines.Send(chid, datatransfer.Open)
}

// InProgress returns a list of in progress channels
Expand All @@ -164,7 +150,7 @@ func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (da
var internalChannel internal.ChannelState
err := c.stateMachines.GetSync(ctx, chid, &internalChannel)
if err != nil {
return nil, NewErrNotFound(chid)
return nil, datatransfer.ErrChannelNotFound
}
return c.fromInternalChannelState(internalChannel), nil
}
Expand Down Expand Up @@ -448,7 +434,7 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran
}
if !has {
return xerrors.Errorf("cannot send FSM event %s to data-transfer channel %s: %w",
datatransfer.Events[code], chid, NewErrNotFound(chid))
datatransfer.Events[code], chid, datatransfer.ErrChannelNotFound)
}
return nil
}
Expand Down
1 change: 0 additions & 1 deletion channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ func cleanupConnection(ctx fsm.Context, env ChannelEnvironment, channel internal
otherParty = channel.Responder
}
env.CleanupChannel(datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder})
env.Unprotect(otherParty, datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder}.String())
return ctx.Trigger(datatransfer.CleanupComplete)
}

Expand Down
Loading

0 comments on commit 91ccb51

Please sign in to comment.