Skip to content

Commit

Permalink
Merge pull request #15603 from serathius/robustness-finish-with-success
Browse files Browse the repository at this point in the history
tests: Ensure that operation history finishes with successful request
  • Loading branch information
serathius authored Apr 4, 2023
2 parents 32acc66 + 6a5d326 commit 523f235
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
8 changes: 3 additions & 5 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,17 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
}

func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
g := errgroup.Group{}
trafficCtx, trafficCancel := context.WithCancel(ctx)
finishTraffic := make(chan struct{})
g.Go(func() error {
triggerFailpoints(ctx, t, lg, clus, failpoint)
time.Sleep(time.Second)
trafficCancel()
close(finishTraffic)
return nil
})
maxRevisionChan := make(chan int64, 1)
g.Go(func() error {
operations = simulateTraffic(trafficCtx, t, lg, clus, traffic)
time.Sleep(time.Second)
operations = simulateTraffic(ctx, t, lg, clus, traffic, finishTraffic)
maxRevisionChan <- operationsMaxRevision(operations)
return nil
})
Expand Down
27 changes: 21 additions & 6 deletions tests/robustness/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (
Defragment TrafficRequestType = "defragment"
)

func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig) []porcupine.Operation {
func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig, finish <-chan struct{}) []porcupine.Operation {
mux := sync.Mutex{}
endpoints := clus.EndpointsGRPC()

Expand All @@ -64,26 +64,39 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)

startTime := time.Now()
cc, err := NewClient(endpoints, ids, startTime)
if err != nil {
t.Fatal(err)
}
defer cc.Close()
wg := sync.WaitGroup{}
for i := 0; i < config.clientCount; i++ {
wg.Add(1)
endpoints := []string{endpoints[i%len(endpoints)]}
c, err := NewClient(endpoints, ids, startTime)
c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, startTime)
if err != nil {
t.Fatal(err)
}
go func(c *recordingClient, clientId int) {
defer wg.Done()
defer c.Close()

config.traffic.Run(ctx, clientId, c, limiter, ids, lm)
config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish)
mux.Lock()
h = h.Merge(c.history.History)
mux.Unlock()
}(c, i)
}
wg.Wait()
endTime := time.Now()

// Ensure that last operation is succeeds
time.Sleep(time.Second)
err = cc.Put(ctx, "tombstone", "true")
if err != nil {
t.Error(err)
}
h = h.Merge(cc.history.History)

operations := h.Operations()
lg.Info("Recorded operations", zap.Int("count", len(operations)))

Expand All @@ -104,7 +117,7 @@ type trafficConfig struct {
}

type Traffic interface {
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage)
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{})
}

type traffic struct {
Expand All @@ -119,12 +132,14 @@ type requestChance struct {
chance int
}

func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {
func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {

for {
select {
case <-ctx.Done():
return
case <-finish:
return
default:
}
key := fmt.Sprintf("%d", rand.Int()%t.keyCount)
Expand Down

0 comments on commit 523f235

Please sign in to comment.