Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge the v0.6.x commit history #190

Merged
merged 18 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,113 @@ some significant breaking changes.
| Hannah Howard | 2 | +3316/-3015 | 25 |
| Steven Allen | 1 | +95/-227 | 5 |

# go-graphsync 0.6.8

### Changelog

- github.com/ipfs/go-graphsync:
- refactor: replace particular request not found errors with public error (#188) ([ipfs/go-graphsync#188](https://github.com/ipfs/go-graphsync/pull/188))
- fix(responsemanager): fix error codes (#182) ([ipfs/go-graphsync#182](https://github.com/ipfs/go-graphsync/pull/182))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Hannah Howard | 1 | +100/-51 | 5 |
| dirkmc | 1 | +10/-3 | 2 |

# go-graphsync 0.6.7

### Changelog

- github.com/ipfs/go-graphsync:
- Add cancel request and wait function (#185) ([ipfs/go-graphsync#185](https://github.com/ipfs/go-graphsync/pull/185))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Hannah Howard | 1 | +154/-32 | 9 |
# go-graphsync 0.6.6

### Changelog

- github.com/ipfs/go-graphsync:
- feat(requestmanager): add request timing (#181) ([ipfs/go-graphsync#181](https://github.com/ipfs/go-graphsync/pull/181))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Hannah Howard | 1 | +9/-1 | 1 |

# go-graphsync 0.6.5

### Changelog

- github.com/ipfs/go-graphsync:
- Resolve 175 race condition, no change to hook timing (#178) ([ipfs/go-graphsync#178](https://github.com/ipfs/go-graphsync/pull/178))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Hannah Howard | 1 | +199/-171 | 10 |

# go-graphsync 0.6.4

### Changelog

- github.com/ipfs/go-graphsync:
- feat/request-queued-hook (#172) ([ipfs/go-graphsync#172](https://github.com/ipfs/go-graphsync/pull/172))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| aarshkshah1992 | 3 | +87/-3 | 7 |
| dirkmc | 1 | +11/-0 | 1 |

# go-graphsync 0.6.3

### Changelog

- github.com/ipfs/go-graphsync:
- Fix/log blockstore reads (#169) ([ipfs/go-graphsync#169](https://github.com/ipfs/go-graphsync/pull/169))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Aarsh Shah | 2 | +40/-177 | 6 |

# go-graphsync 0.6.2

### Changelog

- github.com/ipfs/go-graphsync:
- Better logging for Graphsync traversal (#167) ([ipfs/go-graphsync#167](https://github.com/ipfs/go-graphsync/pull/167))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Aarsh Shah | 1 | +18/-2 | 2 |

# go-graphsync 0.6.1

### Changelog

- github.com/ipfs/go-graphsync:
- feat: fire network error when network disconnects during request (#164) ([ipfs/go-graphsync#164](https://github.com/ipfs/go-graphsync/pull/164))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| dirkmc | 1 | +86/-8 | 4 |


# go-graphsync 0.6.0

Major code refactor for simplicity, ease of understanding
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipld-cbor v0.0.5 // indirect
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log/v2 v2.1.1
github.com/ipfs/go-merkledag v0.3.2
github.com/ipfs/go-peertaskqueue v0.2.0
github.com/ipfs/go-unixfs v0.2.4
Expand Down
26 changes: 22 additions & 4 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ const (
ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key")
)

// RequestContextCancelledErr is an error message received on the error channel when the request context given by the user is cancelled/times out
type RequestContextCancelledErr struct{}
// RequestClientCancelledErr is an error message received on the error channel when the request is cancelled on by the client code,
// either by closing the passed request context or calling CancelRequest
type RequestClientCancelledErr struct{}

func (e RequestContextCancelledErr) Error() string {
return "request context cancelled"
func (e RequestClientCancelledErr) Error() string {
return "request cancelled by client"
}

// RequestFailedBusyErr is an error message received on the error channel when the peer is busy
Expand Down Expand Up @@ -86,6 +87,13 @@ func (e RequestCancelledErr) Error() string {
return "request failed - responder cancelled"
}

// RequestNotFoundErr indicates that a request with a particular request ID was not found
type RequestNotFoundErr struct{}

func (e RequestNotFoundErr) Error() string {
return "request not found"
}

var (
// ErrExtensionAlreadyRegistered means a user extension can be registered only once
ErrExtensionAlreadyRegistered = errors.New("extension already registered")
Expand Down Expand Up @@ -197,6 +205,10 @@ type RequestUpdatedHookActions interface {
UnpauseResponse()
}

// OnIncomingRequestQueuedHook is a hook that runs each time a new incoming request is added to the responder's task queue.
// It receives the peer that sent the request and all data about the request.
type OnIncomingRequestQueuedHook func(p peer.ID, request RequestData)

// OnIncomingRequestHook is a hook that runs each time a new request is received.
// It receives the peer that sent the request and all data about the request.
// It receives an interface for customizing the response to this request
Expand Down Expand Up @@ -262,6 +274,9 @@ type GraphExchange interface {
// UnregisterPersistenceOption unregisters an alternate loader/storer combo
UnregisterPersistenceOption(name string) error

// RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added to the responder's task queue.
RegisterIncomingRequestQueuedHook(hook OnIncomingRequestQueuedHook) UnregisterHookFunc

// RegisterIncomingRequestHook adds a hook that runs when a request is received
RegisterIncomingRequestHook(hook OnIncomingRequestHook) UnregisterHookFunc

Expand Down Expand Up @@ -312,4 +327,7 @@ type GraphExchange interface {

// CancelResponse cancels an in progress response
CancelResponse(peer.ID, RequestID) error

// CancelRequest cancels an in progress request
CancelRequest(context.Context, RequestID) error
}
18 changes: 16 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package graphsync
import (
"context"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -43,6 +43,7 @@ type GraphSync struct {
responseAssembler *responseassembler.ResponseAssembler
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestQueuedHooks *responderhooks.IncomingRequestQueuedHooks
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
Expand Down Expand Up @@ -124,6 +125,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
networkErrorListeners := listeners.NewNetworkErrorListeners()
receiverErrorListeners := listeners.NewReceiverNetworkErrorListeners()
persistenceOptions := persistenceoptions.New()
requestQueuedHooks := responderhooks.NewRequestQueuedHooks()
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
Expand All @@ -142,7 +144,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
graphSync := &GraphSync{
network: network,
linkSystem: linkSystem,
Expand All @@ -152,6 +154,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
responseAssembler: responseAssembler,
peerTaskQueue: peerTaskQueue,
peerManager: peerManager,
incomingRequestQueuedHooks: requestQueuedHooks,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
Expand Down Expand Up @@ -190,6 +193,12 @@ func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingReques
return gs.incomingRequestHooks.Register(hook)
}

// RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added
// to the responder's task queue.
func (gs *GraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc {
return gs.incomingRequestQueuedHooks.Register(hook)
}

// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
return gs.incomingResponseHooks.Register(hook)
Expand Down Expand Up @@ -285,6 +294,11 @@ func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) er
return gs.responseManager.CancelResponse(p, requestID)
}

// CancelRequest cancels an in progress request
func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
return gs.requestManager.CancelRequest(ctx, requestID)
}

type graphSyncReceiver GraphSync

func (gsr *graphSyncReceiver) graphSync() *GraphSync {
Expand Down
4 changes: 2 additions & 2 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func TestNetworkDisconnect(t *testing.T) {

testutil.AssertReceive(ctx, t, networkError, &err, "should receive network error")
testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error")
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error())
testutil.AssertReceive(ctx, t, receiverError, &err, "should receive an error on receiver side")
}

Expand Down Expand Up @@ -652,7 +652,7 @@ func TestConnectFail(t *testing.T) {
var err error
testutil.AssertReceive(ctx, t, reqNetworkError, &err, "should receive network error")
testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error")
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error())
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions ipldutil/traverser.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector"
)

/* TODO: This traverser creates an extra go-routine and is quite complicated, in order to give calling code control of
a selector traversal. If it were implemented inside of go-ipld-primes traversal library, with access to private functions,
it could be done without an extra go-routine, avoiding the possibility of races and simplifying implementation. This has
been documented here: https://github.com/ipld/go-ipld-prime/issues/213 -- and when this issue is implemented, this traverser
can go away */

var defaultLinkSystem = cidlink.DefaultLinkSystem()

var defaultVisitor traversal.AdvVisitFn = func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil }
Expand Down Expand Up @@ -45,6 +51,8 @@ type Traverser interface {
Error(err error)
// Shutdown cancels the traversal
Shutdown(ctx context.Context)
// NBlocksTraversed returns the number of blocks successfully traversed
NBlocksTraversed() int
}

type state struct {
Expand All @@ -64,6 +72,7 @@ type nextResponse struct {
func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
ctx, cancel := context.WithCancel(parentCtx)
t := &traverser{
blocksCount: 0,
parentCtx: parentCtx,
ctx: ctx,
cancel: cancel,
Expand Down Expand Up @@ -100,6 +109,7 @@ func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
// traverser is a class to perform a selector traversal that stops every time a new block is loaded
// and waits for manual input (in the form of advance or error)
type traverser struct {
blocksCount int
parentCtx context.Context
ctx context.Context
cancel func()
Expand All @@ -118,6 +128,10 @@ type traverser struct {
stopped chan struct{}
}

func (t *traverser) NBlocksTraversed() int {
return t.blocksCount
}

func (t *traverser) loader(lnkCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
select {
case <-t.ctx.Done():
Expand Down Expand Up @@ -159,6 +173,7 @@ func (t *traverser) writeDone(err error) {
func (t *traverser) start() {
select {
case <-t.ctx.Done():
close(t.stopped)
return
case t.awaitRequest <- struct{}{}:
}
Expand Down Expand Up @@ -218,16 +233,20 @@ func (t *traverser) Advance(reader io.Reader) error {
if isComplete {
return errors.New("cannot advance when done")
}

select {
case <-t.ctx.Done():
return ContextCancelError{}
case t.awaitRequest <- struct{}{}:
}

select {
case <-t.ctx.Done():
return ContextCancelError{}
case t.responses <- nextResponse{reader, nil}:
}

t.blocksCount++
return nil
}

Expand Down
17 changes: 17 additions & 0 deletions ipldutil/traverser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"testing"
"time"

blocks "github.com/ipfs/go-block-format"
ipld "github.com/ipld/go-ipld-prime"
Expand All @@ -21,6 +22,22 @@ import (
func TestTraverser(t *testing.T) {
ctx := context.Background()

t.Run("started with shutdown context, then shutdown", func(t *testing.T) {
cancelledCtx, cancel := context.WithCancel(ctx)
cancel()
testdata := testutil.NewTestIPLDTree()
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
traverser := TraversalBuilder{
Root: testdata.RootNodeLnk,
Selector: sel,
}.Start(cancelledCtx)
timeoutCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
traverser.Shutdown(timeoutCtx)
require.NoError(t, timeoutCtx.Err())
})

t.Run("traverses correctly, simple struct", func(t *testing.T) {
testdata := testutil.NewTestIPLDTree()
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
Expand Down
3 changes: 2 additions & 1 deletion message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
status == graphsync.RequestFailedContentNotFound ||
status == graphsync.RequestFailedLegal ||
status == graphsync.RequestFailedUnknown ||
status == graphsync.RequestCancelled
status == graphsync.RequestCancelled ||
status == graphsync.RequestRejected
}

// IsTerminalResponseCode returns true if the response code signals
Expand Down
2 changes: 1 addition & 1 deletion messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"

gsmsg "github.com/ipfs/go-graphsync/message"
Expand Down
Loading