Skip to content

Commit

Permalink
Combine watch responses for concurrent client requests and find last …
Browse files Browse the repository at this point in the history
…revision based one time of response

Signed-off-by: ah8ad3 <ah8ad3@gmail.com>
  • Loading branch information
ah8ad3 committed Jul 3, 2024
1 parent caaf1f0 commit a6ec18c
Showing 1 changed file with 39 additions and 7 deletions.
46 changes: 39 additions & 7 deletions tests/robustness/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package robustness

import (
"context"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -144,25 +145,56 @@ external:
}
}

type timeLastRevision struct {
time time.Duration
lastRevision int64
}

func combineWatchResponses(reports []report.ClientReport) map[uint64][]timeLastRevision {
result := make(map[uint64][]timeLastRevision)
for _, r := range reports {
for _, op := range r.Watch {
for _, resp := range op.Responses {
if len(resp.Events) == 0 {
continue
}
result[resp.MemberId] = append(result[resp.MemberId], timeLastRevision{time: resp.Time, lastRevision: resp.Events[len(resp.Events)-1].Revision})
}
}
}
for memberId, structs := range result {
sort.Slice(structs, func(i, j int) bool {
return structs[i].time < structs[j].time
})
result[memberId] = structs
}
return result
}

func validateWatchSequential(t *testing.T, reports []report.ClientReport) {
combinedWatchResponses := combineWatchResponses(reports)
for _, r := range reports {
for _, op := range r.Watch {
if op.Request.Revision != 0 {
continue
}
lastEventRevision := make(map[uint64]int64)
for _, resp := range op.Responses {
if len(resp.Events) == 0 {
continue
}
if _, ok := lastEventRevision[resp.MemberId]; !ok {
lastEventRevision[resp.MemberId] = 1
var lastMemberWatchRevision int64
for i, c := range combinedWatchResponses[resp.MemberId] {
// Reports are sorted by time, find first greater or equal and use previous one.
if resp.Time >= c.time {
if i == 0 {
continue
}
lastMemberWatchRevision = combinedWatchResponses[resp.MemberId][i-1].lastRevision
}
}
firstEventRevision := resp.Events[0].Revision
if firstEventRevision < lastEventRevision[resp.MemberId] {
t.Errorf("Error watch sequential, expect: %v or higher, got: %v, member id: %v", lastEventRevision[resp.MemberId], firstEventRevision, resp.MemberId)
if resp.Events[0].Revision < lastMemberWatchRevision {
t.Errorf("Error watch is not sequential, expect: %v or higher, got: %v, member id: %v", lastMemberWatchRevision, resp.Events[0].Revision, resp.MemberId)
}
lastEventRevision[resp.MemberId] = resp.Events[len(resp.Events)-1].Revision
}
}
}
Expand Down

0 comments on commit a6ec18c

Please sign in to comment.