From d5bad97eda016d13c85d281e2541ca04a7ac7aab Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 23 Sep 2021 20:54:59 -0700 Subject: [PATCH] fix(requestmanager): correct cancel request sending make sure not to send cancel when other peer has already sent terminal status --- requestmanager/executor/executor.go | 4 +++- requestmanager/executor/executor_test.go | 8 ++------ requestmanager/server.go | 4 +++- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index d6d755ba..be62eaa1 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -73,7 +73,9 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask. log.Debugw("beginning request execution", "id", requestTask.Request.ID(), "peer", pid.String(), "root_cid", requestTask.Request.Root().String()) err := e.traverse(requestTask) if err != nil { - e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID())) + if !isContextErr(err) { + e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID())) + } if !isContextErr(err) && !isPausedErr(err) { select { case <-requestTask.Ctx.Done(): diff --git a/requestmanager/executor/executor_test.go b/requestmanager/executor/executor_test.go index 4eb508fe..4b59067f 100644 --- a/requestmanager/executor/executor_test.go +++ b/requestmanager/executor/executor_test.go @@ -66,9 +66,7 @@ func TestRequestExecutionBlockChain(t *testing.T) { verifyResults: func(t *testing.T, tbc *testutil.TestBlockChain, ree *requestExecutionEnv, responses []graphsync.ResponseProgress, receivedErrors []error) { tbc.VerifyResponseRangeSync(responses, 0, 6) require.Empty(t, receivedErrors) - require.Len(t, ree.requestsSent, 2) - require.Equal(t, ree.request, ree.requestsSent[0].request) - require.True(t, ree.requestsSent[1].request.IsCancel()) + require.Equal(t, []requestSent{{ree.p, ree.request}}, ree.requestsSent) require.Len(t, ree.blookHooksCalled, 6) require.EqualError(t, ree.terminalError, ipldutil.ContextCancelError{}.Error()) }, @@ -265,7 +263,6 @@ type pauseKey struct { type requestExecutionEnv struct { // params ctx context.Context - cancelFn func() request gsmsg.GraphSyncRequest p peer.ID blockHookResults map[blockHookKey]hooks.UpdateResult @@ -276,7 +273,6 @@ type requestExecutionEnv struct { traverser ipldutil.Traverser inProgressErr chan error initialRequest bool - empty bool // results requestsSent []requestSent @@ -307,7 +303,7 @@ func (ree *requestExecutionEnv) GetRequestTask(_ peer.ID, _ *peertask.Task, requ Traverser: ree.traverser, P: ree.p, InProgressErr: ree.inProgressErr, - Empty: ree.empty, + Empty: false, InitialRequest: ree.initialRequest, } go func() { diff --git a/requestmanager/server.go b/requestmanager/server.go index 1c735e34..e5f6f3d9 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -198,6 +198,7 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, onTermina if onTerminated != nil { inProgressRequestStatus.onTerminated = append(inProgressRequestStatus.onTerminated, onTerminated) } + rm.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(requestID)) rm.cancelOnError(requestID, inProgressRequestStatus, terminalError) } @@ -263,6 +264,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg if !ok { return false } + rm.SendRequest(requestStatus.p, gsmsg.CancelRequest(response.RequestID())) rm.cancelOnError(response.RequestID(), requestStatus, result.Err) return false } @@ -333,7 +335,7 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error { if !ok { return graphsync.RequestNotFoundErr{} } - if inProgressRequestStatus.state != running { + if inProgressRequestStatus.state == paused { return errors.New("request is already paused") } select {