diff --git a/funnel.go b/funnel.go index 8ebc9d5..3a14e27 100644 --- a/funnel.go +++ b/funnel.go @@ -9,10 +9,14 @@ import ( "errors" "sync" "time" - + "github.com/mohae/deepcopy" + "github.com/tevino/abool" ) +//const error +var timeoutError = errors.New("Timeout expired while waiting for operation execution to complete") + // opResult holds the result from executing of operation type opResult struct { @@ -38,6 +42,12 @@ type operationInProcess struct { // The result from executing of operation, will be available after the channel will be closed. opResult + + // true when this operation has been deleted from the funnel + deleted *abool.AtomicBool + + // Time at which this operation started executing + startTime time.Time } // A Config structure is used to configure the Funnel @@ -104,14 +114,18 @@ func New(option ...Option) *Funnel { // Waiting for completion of the operation and then returns the operation's result or error in case of timeout. func (op *operationInProcess) wait(timeout time.Duration) (res interface{}, err error) { + + operationElapsedTime := time.Since(op.startTime) + operationTimeoutRemaining := timeout - operationElapsedTime + select { case <-op.done: if op.panicErr != nil { // If the operation ended with panic, this pending request also ends the same way. panic(op.panicErr) } return op.res, op.err - case <-time.After(timeout): - return nil, errors.New("Timeout expired when waiting to operation in process") + case <-time.After(operationTimeoutRemaining): + return nil, timeoutError } } @@ -129,13 +143,15 @@ func (f *Funnel) getOperationInProcess(operationId string, opExeFunc func() (int op = &operationInProcess{ operationId: operationId, done: make(chan empty), + startTime: time.Now(), + deleted: abool.New(), } f.opInProcess[operationId] = op // Executing the operation go func(opInProc *operationInProcess) { // closeOperation must be performed within defer function to ensure the closure of the channel. - defer f.closeOperation(opInProc.operationId) + defer f.closeOperation(opInProc) opInProc.res, opInProc.err = opExeFunc() }(op) @@ -143,13 +159,13 @@ func (f *Funnel) getOperationInProcess(operationId string, opExeFunc func() (int } // Closes the operation by updates the operation's result and closure of done channel. -func (f *Funnel) closeOperation(operationId string) { +func (f *Funnel) closeOperation(op *operationInProcess) { f.Lock() defer f.Unlock() - op, found := f.opInProcess[operationId] - if !found { - panic("unexpected behavior, operation id not found") + //Check if the operation completed after a timeout which would result in the operation being deleted from the funnel. + if op.deleted.IsSet() { + return } if rr := recover(); rr != nil { @@ -159,7 +175,7 @@ func (f *Funnel) closeOperation(operationId string) { // Deletion of operationInProcess from the map will occur only when the cache time-to-live will be expired. go func() { time.Sleep(f.config.cacheTtl) - f.deleteOperation(operationId) + f.deleteOperation(op) }() // Releases all the goroutines which are waiting for the operation result. @@ -169,10 +185,19 @@ func (f *Funnel) closeOperation(operationId string) { // Delete the operation from the map. // Once deleted, we do not hold the operation's result anymore, therefore any further request for the // same operation will require re-execution of it. -func (f *Funnel) deleteOperation(operationId string) { +func (f *Funnel) deleteOperation(operation *operationInProcess) { + if operation.deleted.IsSet() { + return + } + f.Lock() defer f.Unlock() - delete(f.opInProcess, operationId) + + //each timeout will call deleteOperation. Only the first timeout should carry out deletion since a stalled app may delete a recreated operation with the same id. + if !operation.deleted.IsSet() { + delete(f.opInProcess, operation.operationId) + operation.deleted.SetTo(true) + } } // Execute receives an identifier of the operation and a callback function to execute. @@ -183,6 +208,9 @@ func (f *Funnel) deleteOperation(operationId string) { func (f *Funnel) Execute(operationId string, opExeFunc func() (interface{}, error)) (res interface{}, err error) { op := f.getOperationInProcess(operationId, opExeFunc) res, err = op.wait(f.config.timeout) // Waiting for completion of operation + if err == timeoutError { + f.deleteOperation(op) + } return } diff --git a/funnel_test.go b/funnel_test.go index e34044e..d383817 100644 --- a/funnel_test.go +++ b/funnel_test.go @@ -106,9 +106,10 @@ func TestEndsWithPanic(t *testing.T) { t.Error("unexpected panic message") } atomic.AddUint64(&numOfGoREndWithPanic, 1) + wg.Done() } }() - defer wg.Done() + if numOfGoroutine%2 == 1 { time.Sleep(time.Millisecond * 500) } @@ -150,7 +151,7 @@ func TestWithTimeout(t *testing.T) { return id + "ended successfully", errors.New("no error") }) - if res == nil && err.Error() == "Timeout expired when waiting to operation in process" { + if res == nil && err == timeoutError { atomic.AddUint64(&numOfGoREndWithTimeout, 1) } @@ -165,3 +166,92 @@ func TestWithTimeout(t *testing.T) { t.Error("Number of operations that ended with timeout expired is not as expected, expected ", numOfOperations*numOfGoroutines, ", got ", numOfGoREndWithTimeoutFinal) } } + +/* +The following tests the ability of the funnel to create a new operation while an operation of the same id has timed out and its execution function is still running +An unexpired cacheTTL on a timedout operation should also not prohibit creating the new operation + */ +func TestWithTimedoutReruns(t *testing.T) { + fnl := New(WithTimeout(time.Millisecond * 50), WithCacheTtl(time.Millisecond * 100)) + + var numOfGoREndWithTimeout uint64 = 0 + var numOfStartedOperations uint64 = 0 + + numOfOperations := 50 + numOfGoroutines := 50 + + + for op := 0; op < numOfOperations; op++ { + var wg sync.WaitGroup + wg.Add(numOfGoroutines) //Only when numOfGoroutines operations time out we run the next batch of numOfGoroutines operations. Each such batch is expected to run in a newly created operation request + + for i := 0; i < numOfGoroutines; i++ { + go func(numOfGoroutine int, id string) { + defer wg.Done() + + res, err := fnl.Execute(id, func() (interface{}, error) { + atomic.AddUint64(&numOfStartedOperations, 1) + + time.Sleep(time.Millisecond * 100 ) + return id + "ended successfully", errors.New("no error") + }) + + if res == nil && err == timeoutError { + atomic.AddUint64(&numOfGoREndWithTimeout, 1) + } + + }(i, "StaticOperationId") + + } + wg.Wait() + } + + numOfGoREndWithTimeoutFinal := atomic.LoadUint64(&numOfGoREndWithTimeout) + if int(numOfGoREndWithTimeoutFinal) != numOfOperations*numOfGoroutines { + t.Error("Number of operations that ended with timeout expired is not as expected, expected ", numOfOperations*numOfGoroutines, ", got ", numOfGoREndWithTimeoutFinal) + + } + + numOfStartedOperationsFinal := atomic.LoadUint64(&numOfStartedOperations) + if int(numOfStartedOperationsFinal) != numOfOperations{ + t.Error("Number of operation execution starts is not as expected, expected ", numOfOperations, ", got ", numOfStartedOperations) + + } +} + +/* + All operation execution requests on the same operation instance should timeout at the same time. The expiry time is determined by the timeout parameter and the time of the first execution request. + */ +func TestOperationAbsoluteTimeout(t *testing.T) { + funnelTimeout := time.Duration(500 * time.Millisecond) + operationSleepTime := time.Duration(550 * time.Millisecond) + numOfOperationRequests :=10 + requestDelay:=time.Duration(30 * time.Millisecond) + operationId := "TestUnifiedTimeout" + + fnl := New(WithTimeout(funnelTimeout)) + + var wg sync.WaitGroup + wg.Add(numOfOperationRequests) + + start:=time.Now() + for i:=0; i < numOfOperationRequests; i++ { + go func() { + defer wg.Done() + fnl.Execute(operationId, func() (interface{}, error) { + time.Sleep(operationSleepTime) + return operationId + "ended successfully", errors.New("no error") + }) + }() + time.Sleep(requestDelay) + } + + wg.Wait() + + elapsedTimeAllRequests :=time.Since(start) + expectedOperationTimeoutWithGrace:= funnelTimeout + time.Duration(100*time.Millisecond) + + if elapsedTimeAllRequests > expectedOperationTimeoutWithGrace { + t.Error("Expected all operation request to timeout at the same time, funnelTimeout", funnelTimeout, " Elapsed time for all operations", elapsedTimeAllRequests) + } +}