Skip to content

Commit

Permalink
This PR fixes a bug that occurs when a cached operation has completed… (
Browse files Browse the repository at this point in the history
#15)

* This PR fixes a bug that occurs when a cached operation has completed and the timout period has expired. When this happens select in wait() can't be trusted to return correctly, as a negative duration will lead to time.After() firing immediately and the runtime can randomly chose either case. This leads to random timeout errors when a value is cached.

* Moved wg.Add to avoid possible race condition.

* Moved completed.IsSet() to wait to avoid potential race condition.

* Moved comment

Co-authored-by: ygonzalez <yoel.gonzalez@mailchimp.com>
  • Loading branch information
ygonzal6 and ygonzal6 authored Oct 12, 2022
1 parent d13eafa commit 2ae131f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 15 deletions.
10 changes: 9 additions & 1 deletion funnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type operationInProcess struct {

// Time at which this operation started executing
startTime time.Time

// Operation will be marked completed once a result is returned
completed *abool.AtomicBool
}

// A Config structure is used to configure the Funnel
Expand Down Expand Up @@ -114,7 +117,6 @@ func New(option ...Option) *Funnel {

// Waiting for completion of the operation and then returns the operation's result or error in case of timeout.
func (op *operationInProcess) wait(timeout time.Duration) (res interface{}, err error) {

operationElapsedTime := time.Since(op.startTime)
operationTimeoutRemaining := timeout - operationElapsedTime

Expand All @@ -125,6 +127,9 @@ func (op *operationInProcess) wait(timeout time.Duration) (res interface{}, err
}
return op.res, op.err
case <-time.After(operationTimeoutRemaining):
if op.completed.IsSet() {
return op.res, op.err
}
return nil, timeoutError
}
}
Expand All @@ -145,6 +150,7 @@ func (f *Funnel) getOperationInProcess(operationId string, opExeFunc func() (int
done: make(chan empty),
startTime: time.Now(),
deleted: abool.New(),
completed: abool.New(),
}
f.opInProcess[operationId] = op

Expand All @@ -153,6 +159,7 @@ func (f *Funnel) getOperationInProcess(operationId string, opExeFunc func() (int
// closeOperation must be performed within defer function to ensure the closure of the channel.
defer f.closeOperation(opInProc)
opInProc.res, opInProc.err = opExeFunc()
opInProc.completed.Set()
}(op)

return op
Expand Down Expand Up @@ -207,6 +214,7 @@ func (f *Funnel) deleteOperation(operation *operationInProcess) {
// Use ExecuteAndCopyResult to return a dedicated (copied) object.
func (f *Funnel) Execute(operationId string, opExeFunc func() (interface{}, error)) (res interface{}, err error) {
op := f.getOperationInProcess(operationId, opExeFunc)

res, err = op.wait(f.config.timeout) // Waiting for completion of operation
if err == timeoutError {
f.deleteOperation(op)
Expand Down
53 changes: 39 additions & 14 deletions funnel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package funnel

import (
"errors"
"github.com/stretchr/testify/assert"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestBasic(t *testing.T) {
Expand Down Expand Up @@ -37,9 +38,7 @@ func TestBasic(t *testing.T) {
if res != id+"ended successfully" || err.Error() != "no error" {
t.Error("The results of operations is not as expected")
}

}(i, opId)

}
}

Expand Down Expand Up @@ -76,9 +75,7 @@ func TestWithCacheTtl(t *testing.T) {
if res != id+"ended successfully" || err.Error() != "no error" {
t.Error("The results of operations is not as expected")
}

}(i, opId)

}
}

Expand Down Expand Up @@ -121,9 +118,7 @@ func TestEndsWithPanic(t *testing.T) {
})

t.Error("Should not reach this line because panic should occur")

}(i, opId)

}
}

Expand Down Expand Up @@ -156,9 +151,7 @@ func TestWithTimeout(t *testing.T) {
if res == nil && err == timeoutError {
atomic.AddUint64(&numOfGoREndWithTimeout, 1)
}

}(i, opId)

}
}

Expand All @@ -184,7 +177,7 @@ func TestWithTimedoutReruns(t *testing.T) {

for op := 0; op < numOfOperations; op++ {
var wg sync.WaitGroup
wg.Add(numOfGoroutines) //Only when numOfGoroutines operations time out we run the next batch of numOfGoroutines operations. Each such batch is expected to run in a newly created operation request
wg.Add(numOfGoroutines) // Only when numOfGoroutines operations time out we run the next batch of numOfGoroutines operations. Each such batch is expected to run in a newly created operation request

for i := 0; i < numOfGoroutines; i++ {
go func(numOfGoroutine int, id string) {
Expand All @@ -200,23 +193,19 @@ func TestWithTimedoutReruns(t *testing.T) {
if res == nil && err == timeoutError {
atomic.AddUint64(&numOfGoREndWithTimeout, 1)
}

}(i, "StaticOperationId")

}
wg.Wait()
}

numOfGoREndWithTimeoutFinal := atomic.LoadUint64(&numOfGoREndWithTimeout)
if int(numOfGoREndWithTimeoutFinal) != numOfOperations*numOfGoroutines {
t.Error("Number of operations that ended with timeout expired is not as expected, expected ", numOfOperations*numOfGoroutines, ", got ", numOfGoREndWithTimeoutFinal)

}

numOfStartedOperationsFinal := atomic.LoadUint64(&numOfStartedOperations)
if int(numOfStartedOperationsFinal) != numOfOperations {
t.Error("Number of operation execution starts is not as expected, expected ", numOfOperations, ", got ", numOfStartedOperations)

}
}

Expand Down Expand Up @@ -313,5 +302,41 @@ func TestExecuteAndCopyResult(t *testing.T) {
assert.Equal(t, *num1, *num2, "Objects' values are expected to be the same.")

assert.False(t, num1 == num2, "Objects' addresses are expected to be different. addresses received:", num1, ",", num2)
}

// Test cached value
func TestCachedValued(t *testing.T) {
opId := "opId"
f := New(WithCacheTtl(time.Hour), WithTimeout(100*time.Millisecond))
// cache value
f.Execute(opId, func() (interface{}, error) {
return nil, nil
})

numOfGoroutines := 1000
wg := sync.WaitGroup{}
failedExecute := false

for i := 0; i < numOfGoroutines; i++ {
if i%3 == 0 {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
go func() {
_, err := f.Execute(opId, func() (interface{}, error) {
return nil, nil
})
if err != nil && errors.Is(err, timeoutError) {
failedExecute = true
}
wg.Done()
}()
// result should have been cached
if failedExecute {
t.Error("false timeout error")
break
}
}

wg.Wait()
}

0 comments on commit 2ae131f

Please sign in to comment.