diff --git a/requestmanager/client.go b/requestmanager/client.go index adbf421b..f72dcf07 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -41,6 +41,14 @@ const ( defaultPriority = graphsync.Priority(0) ) +type state uint64 + +const ( + queued state = iota + running + paused +) + type inProgressRequestStatus struct { ctx context.Context startTime time.Time @@ -48,7 +56,7 @@ type inProgressRequestStatus struct { p peer.ID terminalError error pauseMessages chan struct{} - paused bool + state state lastResponse atomic.Value onTerminated []chan<- error request gsmsg.GraphSyncRequest diff --git a/requestmanager/server.go b/requestmanager/server.go index 44f0a38e..1c735e34 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -79,6 +79,7 @@ func (rm *RequestManager) newRequest(p peer.ID, root ipld.Link, selector ipld.No pauseMessages: make(chan struct{}, 1), doNotSendCids: doNotSendCids, request: request, + state: queued, nodeStyleChooser: hooksResult.CustomChooser, inProgressChan: make(chan graphsync.ResponseProgress), inProgressErr: make(chan error), @@ -119,6 +120,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re }.Start(ipr.ctx) } + ipr.state = running return executor.RequestTask{ Ctx: ipr.ctx, Request: ipr.request, @@ -174,7 +176,7 @@ func (rm *RequestManager) releaseRequestTask(p peer.ID, task *peertask.Task, err return } if _, ok := err.(hooks.ErrPaused); ok { - ipr.paused = true + ipr.state = paused return } log.Infow("graphsync request complete", "request id", requestID, "peer", ipr.p, "total time", time.Since(ipr.startTime)) @@ -203,7 +205,7 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr if ipr.terminalError == nil { ipr.terminalError = terminalError } - if ipr.paused { + if ipr.state != running { rm.terminateRequest(requestID, ipr) } else { ipr.cancelFn() @@ -317,10 +319,10 @@ func (rm *RequestManager) unpause(id graphsync.RequestID, extensions []graphsync if !ok { return graphsync.RequestNotFoundErr{} } - if !inProgressRequestStatus.paused { + if inProgressRequestStatus.state != paused { return errors.New("request is not paused") } - inProgressRequestStatus.paused = false + inProgressRequestStatus.state = queued inProgressRequestStatus.request = inProgressRequestStatus.request.ReplaceExtensions(extensions) rm.requestQueue.PushTask(inProgressRequestStatus.p, peertask.Task{Topic: id, Priority: math.MaxInt32, Work: 1}) return nil @@ -331,7 +333,7 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error { if !ok { return graphsync.RequestNotFoundErr{} } - if inProgressRequestStatus.paused { + if inProgressRequestStatus.state != running { return errors.New("request is already paused") } select {