Skip to content

Commit

Permalink
refactor(requestmanager): use state var
Browse files Browse the repository at this point in the history
use a state var to track operational model
  • Loading branch information
hannahhoward committed Sep 24, 2021
1 parent f30e8bd commit b80966d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
10 changes: 9 additions & 1 deletion requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,22 @@ const (
defaultPriority = graphsync.Priority(0)
)

type state uint64

const (
queued state = iota
running
paused
)

type inProgressRequestStatus struct {
ctx context.Context
startTime time.Time
cancelFn func()
p peer.ID
terminalError error
pauseMessages chan struct{}
paused bool
state state
lastResponse atomic.Value
onTerminated []chan<- error
request gsmsg.GraphSyncRequest
Expand Down
12 changes: 7 additions & 5 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (rm *RequestManager) newRequest(p peer.ID, root ipld.Link, selector ipld.No
pauseMessages: make(chan struct{}, 1),
doNotSendCids: doNotSendCids,
request: request,
state: queued,
nodeStyleChooser: hooksResult.CustomChooser,
inProgressChan: make(chan graphsync.ResponseProgress),
inProgressErr: make(chan error),
Expand Down Expand Up @@ -119,6 +120,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
}.Start(ipr.ctx)
}

ipr.state = running
return executor.RequestTask{
Ctx: ipr.ctx,
Request: ipr.request,
Expand Down Expand Up @@ -174,7 +176,7 @@ func (rm *RequestManager) releaseRequestTask(p peer.ID, task *peertask.Task, err
return
}
if _, ok := err.(hooks.ErrPaused); ok {
ipr.paused = true
ipr.state = paused
return
}
log.Infow("graphsync request complete", "request id", requestID, "peer", ipr.p, "total time", time.Since(ipr.startTime))
Expand Down Expand Up @@ -203,7 +205,7 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr
if ipr.terminalError == nil {
ipr.terminalError = terminalError
}
if ipr.paused {
if ipr.state != running {
rm.terminateRequest(requestID, ipr)
} else {
ipr.cancelFn()
Expand Down Expand Up @@ -317,10 +319,10 @@ func (rm *RequestManager) unpause(id graphsync.RequestID, extensions []graphsync
if !ok {
return graphsync.RequestNotFoundErr{}
}
if !inProgressRequestStatus.paused {
if inProgressRequestStatus.state != paused {
return errors.New("request is not paused")
}
inProgressRequestStatus.paused = false
inProgressRequestStatus.state = queued
inProgressRequestStatus.request = inProgressRequestStatus.request.ReplaceExtensions(extensions)
rm.requestQueue.PushTask(inProgressRequestStatus.p, peertask.Task{Topic: id, Priority: math.MaxInt32, Work: 1})
return nil
Expand All @@ -331,7 +333,7 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error {
if !ok {
return graphsync.RequestNotFoundErr{}
}
if inProgressRequestStatus.paused {
if inProgressRequestStatus.state != running {
return errors.New("request is already paused")
}
select {
Expand Down

0 comments on commit b80966d

Please sign in to comment.