Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timed out operations are removed from funnel #6

Merged
merged 3 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions funnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ import (
"errors"
"sync"
"time"

"github.com/mohae/deepcopy"
"github.com/tevino/abool"
)

//const error
var timeoutError = errors.New("Timeout expired while waiting for operation execution to complete")

// opResult holds the result from executing of operation
type opResult struct {

Expand All @@ -38,6 +42,12 @@ type operationInProcess struct {

// The result from executing of operation, will be available after the channel will be closed.
opResult

// true when this operation has been deleted from the funnel
deleted *abool.AtomicBool

// Time at which this operation started executing
startTime time.Time
}

// A Config structure is used to configure the Funnel
Expand Down Expand Up @@ -104,14 +114,18 @@ func New(option ...Option) *Funnel {

// Waiting for completion of the operation and then returns the operation's result or error in case of timeout.
func (op *operationInProcess) wait(timeout time.Duration) (res interface{}, err error) {

operationElapsedTime := time.Since(op.startTime)
operationTimeoutRemaining := timeout - operationElapsedTime

select {
case <-op.done:
if op.panicErr != nil { // If the operation ended with panic, this pending request also ends the same way.
panic(op.panicErr)
}
return op.res, op.err
case <-time.After(timeout):
return nil, errors.New("Timeout expired when waiting to operation in process")
case <-time.After(operationTimeoutRemaining):
return nil, timeoutError
}
}

Expand All @@ -129,27 +143,29 @@ func (f *Funnel) getOperationInProcess(operationId string, opExeFunc func() (int
op = &operationInProcess{
operationId: operationId,
done: make(chan empty),
startTime: time.Now(),
deleted: abool.New(),
}
f.opInProcess[operationId] = op

// Executing the operation
go func(opInProc *operationInProcess) {
// closeOperation must be performed within defer function to ensure the closure of the channel.
defer f.closeOperation(opInProc.operationId)
defer f.closeOperation(opInProc)
opInProc.res, opInProc.err = opExeFunc()
}(op)

return op
}

// Closes the operation by updates the operation's result and closure of done channel.
func (f *Funnel) closeOperation(operationId string) {
func (f *Funnel) closeOperation(op *operationInProcess) {
f.Lock()
defer f.Unlock()

op, found := f.opInProcess[operationId]
if !found {
panic("unexpected behavior, operation id not found")
//Check if the operation completed after a timeout which would result in the operation being deleted from the funnel.
if op.deleted.IsSet() {
return
}

if rr := recover(); rr != nil {
Expand All @@ -159,7 +175,7 @@ func (f *Funnel) closeOperation(operationId string) {
// Deletion of operationInProcess from the map will occur only when the cache time-to-live will be expired.
go func() {
time.Sleep(f.config.cacheTtl)
f.deleteOperation(operationId)
f.deleteOperation(op)
}()

// Releases all the goroutines which are waiting for the operation result.
Expand All @@ -169,10 +185,19 @@ func (f *Funnel) closeOperation(operationId string) {
// Delete the operation from the map.
// Once deleted, we do not hold the operation's result anymore, therefore any further request for the
// same operation will require re-execution of it.
func (f *Funnel) deleteOperation(operationId string) {
func (f *Funnel) deleteOperation(operation *operationInProcess) {
if operation.deleted.IsSet() {
return
}

f.Lock()
defer f.Unlock()
delete(f.opInProcess, operationId)

//each timeout will call deleteOperation. Only the first timeout should carry out deletion since a stalled app may delete a recreated operation with the same id.
if !operation.deleted.IsSet() {
delete(f.opInProcess, operation.operationId)
operation.deleted.SetTo(true)
}
}

// Execute receives an identifier of the operation and a callback function to execute.
Expand All @@ -183,6 +208,9 @@ func (f *Funnel) deleteOperation(operationId string) {
func (f *Funnel) Execute(operationId string, opExeFunc func() (interface{}, error)) (res interface{}, err error) {
op := f.getOperationInProcess(operationId, opExeFunc)
res, err = op.wait(f.config.timeout) // Waiting for completion of operation
if err == timeoutError {
f.deleteOperation(op)
}
return
}

Expand Down
94 changes: 92 additions & 2 deletions funnel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ func TestEndsWithPanic(t *testing.T) {
t.Error("unexpected panic message")
}
atomic.AddUint64(&numOfGoREndWithPanic, 1)
wg.Done()
}
}()
defer wg.Done()

if numOfGoroutine%2 == 1 {
time.Sleep(time.Millisecond * 500)
}
Expand Down Expand Up @@ -150,7 +151,7 @@ func TestWithTimeout(t *testing.T) {
return id + "ended successfully", errors.New("no error")
})

if res == nil && err.Error() == "Timeout expired when waiting to operation in process" {
if res == nil && err == timeoutError {
atomic.AddUint64(&numOfGoREndWithTimeout, 1)
}

Expand All @@ -165,3 +166,92 @@ func TestWithTimeout(t *testing.T) {
t.Error("Number of operations that ended with timeout expired is not as expected, expected ", numOfOperations*numOfGoroutines, ", got ", numOfGoREndWithTimeoutFinal)
}
}

/*
The following tests the ability of the funnel to create a new operation while an operation of the same id has timed out and its execution function is still running
An unexpired cacheTTL on a timedout operation should also not prohibit creating the new operation
*/
func TestWithTimedoutReruns(t *testing.T) {
fnl := New(WithTimeout(time.Millisecond * 50), WithCacheTtl(time.Millisecond * 100))

var numOfGoREndWithTimeout uint64 = 0
var numOfStartedOperations uint64 = 0

numOfOperations := 50
numOfGoroutines := 50


for op := 0; op < numOfOperations; op++ {
var wg sync.WaitGroup
wg.Add(numOfGoroutines) //Only when numOfGoroutines operations time out we run the next batch of numOfGoroutines operations. Each such batch is expected to run in a newly created operation request

for i := 0; i < numOfGoroutines; i++ {
go func(numOfGoroutine int, id string) {
defer wg.Done()

res, err := fnl.Execute(id, func() (interface{}, error) {
atomic.AddUint64(&numOfStartedOperations, 1)

time.Sleep(time.Millisecond * 100 )
return id + "ended successfully", errors.New("no error")
})

if res == nil && err == timeoutError {
atomic.AddUint64(&numOfGoREndWithTimeout, 1)
}

}(i, "StaticOperationId")

}
wg.Wait()
}

numOfGoREndWithTimeoutFinal := atomic.LoadUint64(&numOfGoREndWithTimeout)
if int(numOfGoREndWithTimeoutFinal) != numOfOperations*numOfGoroutines {
t.Error("Number of operations that ended with timeout expired is not as expected, expected ", numOfOperations*numOfGoroutines, ", got ", numOfGoREndWithTimeoutFinal)

}

numOfStartedOperationsFinal := atomic.LoadUint64(&numOfStartedOperations)
if int(numOfStartedOperationsFinal) != numOfOperations{
t.Error("Number of operation execution starts is not as expected, expected ", numOfOperations, ", got ", numOfStartedOperations)

}
}

/*
All operation execution requests on the same operation instance should timeout at the same time. The expiry time is determined by the timeout parameter and the time of the first execution request.
*/
func TestOperationAbsoluteTimeout(t *testing.T) {
funnelTimeout := time.Duration(500 * time.Millisecond)
operationSleepTime := time.Duration(550 * time.Millisecond)
numOfOperationRequests :=10
requestDelay:=time.Duration(30 * time.Millisecond)
operationId := "TestUnifiedTimeout"

fnl := New(WithTimeout(funnelTimeout))

var wg sync.WaitGroup
wg.Add(numOfOperationRequests)

start:=time.Now()
for i:=0; i < numOfOperationRequests; i++ {
go func() {
defer wg.Done()
fnl.Execute(operationId, func() (interface{}, error) {
time.Sleep(operationSleepTime)
return operationId + "ended successfully", errors.New("no error")
})
}()
time.Sleep(requestDelay)
}

wg.Wait()

elapsedTimeAllRequests :=time.Since(start)
expectedOperationTimeoutWithGrace:= funnelTimeout + time.Duration(100*time.Millisecond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expectedOperationTimeoutWithGrace would be 600 milliseconds, when operationSleepTime is 550. The test doesn't insure the operation ended with a timeout.
Im suggesting to validate that timeout error returned for each request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming funnel timeout is working. Perhaps we need additional tests to assure this.


if elapsedTimeAllRequests > expectedOperationTimeoutWithGrace {
t.Error("Expected all operation request to timeout at the same time, funnelTimeout", funnelTimeout, " Elapsed time for all operations", elapsedTimeAllRequests)
}
}