From 2ae131fd57ad3402ef91cff0f10dfe327d2500b0 Mon Sep 17 00:00:00 2001 From: ygonzal6 <115563695+ygonzal6@users.noreply.github.com> Date: Wed, 12 Oct 2022 10:08:40 -0400 Subject: [PATCH] =?UTF-8?q?This=20PR=20fixes=20a=20bug=20that=20occurs=20w?= =?UTF-8?q?hen=20a=20cached=20operation=20has=20completed=E2=80=A6=20(#15)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * This PR fixes a bug that occurs when a cached operation has completed and the timout period has expired. When this happens select in wait() can't be trusted to return correctly, as a negative duration will lead to time.After() firing immediately and the runtime can randomly chose either case. This leads to random timeout errors when a value is cached. * Moved wg.Add to avoid possible race condition. * Moved completed.IsSet() to wait to avoid potential race condition. * Moved comment Co-authored-by: ygonzalez --- funnel.go | 10 +++++++++- funnel_test.go | 53 +++++++++++++++++++++++++++++++++++++------------- 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/funnel.go b/funnel.go index 1bc420a..fa8e6a5 100644 --- a/funnel.go +++ b/funnel.go @@ -48,6 +48,9 @@ type operationInProcess struct { // Time at which this operation started executing startTime time.Time + + // Operation will be marked completed once a result is returned + completed *abool.AtomicBool } // A Config structure is used to configure the Funnel @@ -114,7 +117,6 @@ func New(option ...Option) *Funnel { // Waiting for completion of the operation and then returns the operation's result or error in case of timeout. func (op *operationInProcess) wait(timeout time.Duration) (res interface{}, err error) { - operationElapsedTime := time.Since(op.startTime) operationTimeoutRemaining := timeout - operationElapsedTime @@ -125,6 +127,9 @@ func (op *operationInProcess) wait(timeout time.Duration) (res interface{}, err } return op.res, op.err case <-time.After(operationTimeoutRemaining): + if op.completed.IsSet() { + return op.res, op.err + } return nil, timeoutError } } @@ -145,6 +150,7 @@ func (f *Funnel) getOperationInProcess(operationId string, opExeFunc func() (int done: make(chan empty), startTime: time.Now(), deleted: abool.New(), + completed: abool.New(), } f.opInProcess[operationId] = op @@ -153,6 +159,7 @@ func (f *Funnel) getOperationInProcess(operationId string, opExeFunc func() (int // closeOperation must be performed within defer function to ensure the closure of the channel. defer f.closeOperation(opInProc) opInProc.res, opInProc.err = opExeFunc() + opInProc.completed.Set() }(op) return op @@ -207,6 +214,7 @@ func (f *Funnel) deleteOperation(operation *operationInProcess) { // Use ExecuteAndCopyResult to return a dedicated (copied) object. func (f *Funnel) Execute(operationId string, opExeFunc func() (interface{}, error)) (res interface{}, err error) { op := f.getOperationInProcess(operationId, opExeFunc) + res, err = op.wait(f.config.timeout) // Waiting for completion of operation if err == timeoutError { f.deleteOperation(op) diff --git a/funnel_test.go b/funnel_test.go index 73d5e3b..327197d 100644 --- a/funnel_test.go +++ b/funnel_test.go @@ -2,13 +2,14 @@ package funnel import ( "errors" - "github.com/stretchr/testify/assert" "math/rand" "strconv" "sync" "sync/atomic" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestBasic(t *testing.T) { @@ -37,9 +38,7 @@ func TestBasic(t *testing.T) { if res != id+"ended successfully" || err.Error() != "no error" { t.Error("The results of operations is not as expected") } - }(i, opId) - } } @@ -76,9 +75,7 @@ func TestWithCacheTtl(t *testing.T) { if res != id+"ended successfully" || err.Error() != "no error" { t.Error("The results of operations is not as expected") } - }(i, opId) - } } @@ -121,9 +118,7 @@ func TestEndsWithPanic(t *testing.T) { }) t.Error("Should not reach this line because panic should occur") - }(i, opId) - } } @@ -156,9 +151,7 @@ func TestWithTimeout(t *testing.T) { if res == nil && err == timeoutError { atomic.AddUint64(&numOfGoREndWithTimeout, 1) } - }(i, opId) - } } @@ -184,7 +177,7 @@ func TestWithTimedoutReruns(t *testing.T) { for op := 0; op < numOfOperations; op++ { var wg sync.WaitGroup - wg.Add(numOfGoroutines) //Only when numOfGoroutines operations time out we run the next batch of numOfGoroutines operations. Each such batch is expected to run in a newly created operation request + wg.Add(numOfGoroutines) // Only when numOfGoroutines operations time out we run the next batch of numOfGoroutines operations. Each such batch is expected to run in a newly created operation request for i := 0; i < numOfGoroutines; i++ { go func(numOfGoroutine int, id string) { @@ -200,9 +193,7 @@ func TestWithTimedoutReruns(t *testing.T) { if res == nil && err == timeoutError { atomic.AddUint64(&numOfGoREndWithTimeout, 1) } - }(i, "StaticOperationId") - } wg.Wait() } @@ -210,13 +201,11 @@ func TestWithTimedoutReruns(t *testing.T) { numOfGoREndWithTimeoutFinal := atomic.LoadUint64(&numOfGoREndWithTimeout) if int(numOfGoREndWithTimeoutFinal) != numOfOperations*numOfGoroutines { t.Error("Number of operations that ended with timeout expired is not as expected, expected ", numOfOperations*numOfGoroutines, ", got ", numOfGoREndWithTimeoutFinal) - } numOfStartedOperationsFinal := atomic.LoadUint64(&numOfStartedOperations) if int(numOfStartedOperationsFinal) != numOfOperations { t.Error("Number of operation execution starts is not as expected, expected ", numOfOperations, ", got ", numOfStartedOperations) - } } @@ -313,5 +302,41 @@ func TestExecuteAndCopyResult(t *testing.T) { assert.Equal(t, *num1, *num2, "Objects' values are expected to be the same.") assert.False(t, num1 == num2, "Objects' addresses are expected to be different. addresses received:", num1, ",", num2) +} + +// Test cached value +func TestCachedValued(t *testing.T) { + opId := "opId" + f := New(WithCacheTtl(time.Hour), WithTimeout(100*time.Millisecond)) + // cache value + f.Execute(opId, func() (interface{}, error) { + return nil, nil + }) + + numOfGoroutines := 1000 + wg := sync.WaitGroup{} + failedExecute := false + + for i := 0; i < numOfGoroutines; i++ { + if i%3 == 0 { + time.Sleep(100 * time.Millisecond) + } + wg.Add(1) + go func() { + _, err := f.Execute(opId, func() (interface{}, error) { + return nil, nil + }) + if err != nil && errors.Is(err, timeoutError) { + failedExecute = true + } + wg.Done() + }() + // result should have been cached + if failedExecute { + t.Error("false timeout error") + break + } + } + wg.Wait() }