Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test for blocking method #16

Merged
merged 1 commit into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions go/changefeed/blocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package changefeed

import (
"context"
"database/sql"
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vippsas/mssql-changefeed/go/changefeed/sqltest"
"sync"
"testing"
)

Expand Down Expand Up @@ -79,3 +83,193 @@ alter role [changefeed.writers:myservice.TestSerializeWriters] add member myuser
assert.Less(t, binary.BigEndian.Uint64(ulids[0].EventID[:8]), binary.BigEndian.Uint64(ulids[4].EventID[:8]))

}

type blockingLoadTest struct {
description string
writerCount, readerCount, eventCountPerThread int
writeInParallel, readInParallel, readAfterWrite bool
}

func (tc blockingLoadTest) Run(t *testing.T) {
_, err := fixture.AdminDB.ExecContext(context.Background(), `
truncate table myservice.TestLoadBlocking;
truncate table [changefeed].[state:myservice.TestLoadBlocking];
`)
require.NoError(t, err)

var writerErr, readerErr error
var wg sync.WaitGroup
wg.Add(tc.writerCount)

// Run writerCount threads in parallel to write new events

writeThread := func(ithread int) {
defer wg.Done()
for j := 0; j != tc.eventCountPerThread; j++ {
_, err := fixture.UserDB.ExecContext(context.TODO(), `
begin try
begin transaction
exec [changefeed].[lock:myservice.TestLoadBlocking] @shard_id = 0

insert into myservice.TestLoadBlocking(ULID, Thread, Number)
values (changefeed.ulid(0), @p1, @p2);
commit
end try
begin catch
if @@trancount > 0 rollback;
end catch
`, ithread, j)
if err != nil {
writerErr = err
return
}
}
}
for i := 0; i != tc.writerCount; i++ {
if tc.writeInParallel {
go writeThread(i)
} else {
writeThread(i)
}
}

if tc.readAfterWrite {
wg.Wait()
wg = sync.WaitGroup{}
}

readThread := func(ithread int) {
defer wg.Done()

type Row struct {
ULID string
Thread int
Number int
}

events := make([][]int, tc.writerCount)
var cursor string
cursor = "0x00"

done := 0
sameCursorCount := 0
for done < tc.writerCount {
if readerErr != nil {
return
}

previousCursor := cursor

rows, err := sqltest.StructSlice2[Row](context.TODO(), fixture.ReadUserDB, `
select top(10) convert(varchar(max), ULID, 1) as ULID, Thread, Number from myservice.TestLoadBlocking
where ULID > convert(binary(16), @cursor, 1);
`, sql.Named("cursor", cursor))
if err != nil {
readerErr = err
return
}
for _, r := range rows {
events[r.Thread] = append(events[r.Thread], r.Number)
if len(events[r.Thread]) > 1 {
lst := events[r.Thread]
if lst[len(lst)-1] != lst[len(lst)-2]+1 {
readerErr = fmt.Errorf("%03d: did not consume events in order", ithread)
fmt.Printf("==== ERROR in %03d: Not consuming events in order, got %d after %d, cursor is %s\n",
ithread,
lst[len(lst)-1],
lst[len(lst)-2],
cursor)
return
}
}
if len(events[r.Thread]) == tc.eventCountPerThread {
done++
}
cursor = r.ULID
}

if previousCursor == cursor {
// no progress... are we waiting for writers or are we truly done?
feedSize := sqltest.QueryInt(fixture.AdminDB, `select count(*) from myservice.TestLoadBlocking`)
if feedSize == tc.writerCount*tc.eventCountPerThread {
sameCursorCount++
if sameCursorCount > 10 {
readerErr = fmt.Errorf("%03d: stuck at end of feed without having consumed all events", ithread)
fmt.Println(events[0])
return
}
}
} else {
sameCursorCount = 0
}

s := ""
for i := 0; i != tc.writerCount; i++ {
s = s + " " + fmt.Sprintf("%d", len(events[i]))
}
fmt.Printf("%03d: %s (cursor=%s)\n", ithread, s, cursor)

}

// Once everything has been consumed, assert that all events are here and in order...
for i := 0; i != tc.writerCount; i++ {
if len(events[i]) != tc.eventCountPerThread {
readerErr = errors.Errorf("wrong count in events[i]")
return
}
for j := 0; j != tc.eventCountPerThread; j++ {
if events[i][j] != j {
readerErr = errors.Errorf("events[i][j] != j")
}
}
}
}

// Run readerCount readers. They read all events, bins them by AggregateID, and checks order within each AggregateID
wg.Add(tc.readerCount)
for i := 0; i != tc.readerCount; i++ {
if tc.readInParallel {
go readThread(i)
} else {
readThread(i)
}
}

wg.Wait()

require.NoError(t, writerErr)
require.NoError(t, readerErr)

}

func TestLoadBlocking(t *testing.T) {
_, err := fixture.AdminDB.ExecContext(context.Background(), `
exec [changefeed].setup_feed 'myservice.TestLoadBlocking', @blocking = 1;
alter role [changefeed.writers:myservice.TestLoadBlocking] add member myuser;
`)
require.NoError(t, err)

for _, tc := range []blockingLoadTest{
{
description: "small fully parallel test",
writerCount: 10,
readerCount: 5,
eventCountPerThread: 100,
writeInParallel: true,
readInParallel: true,
readAfterWrite: false,
},
{
description: "heavy fully parallel test",
writerCount: 10,
readerCount: 10,
eventCountPerThread: 200,
writeInParallel: true,
readInParallel: true,
readAfterWrite: false,
},
} {
t.Run(tc.description, tc.Run)
}

}
7 changes: 7 additions & 0 deletions go/changefeed/testdata/mytable.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ create table myservice.TestLoadOutbox (
primary key (AggregateID, Version)
);

create table myservice.TestLoadBlocking (
ULID binary(16) not null,
Thread bigint not null,
Number bigint not null,
primary key (ULID)
);

create table myservice.MyTable (
MyAggregateID bigint not null,
Version int not null,
Expand Down