Skip to content

Commit

Permalink
Test and bugfixes for outbox method
Browse files Browse the repository at this point in the history
  • Loading branch information
dss-vipps committed Oct 17, 2023
1 parent fa638c7 commit bc5683d
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 20 deletions.
204 changes: 204 additions & 0 deletions go/changefeed/outbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"encoding/hex"
"fmt"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vippsas/mssql-changefeed/go/changefeed/sqltest"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -132,3 +134,205 @@ insert into [changefeed].[outbox:myservice.TestHappyDay] (shard_id, time_hint, A
}, allEvents)

}

type outboxLoadTest struct {
description string
writerCount, readerCount, eventCountPerThread int
writeInParallel, readInParallel, readAfterWrite bool
}

func (tc outboxLoadTest) Run(t *testing.T) {
_, err := fixture.AdminDB.ExecContext(context.Background(), `
truncate table [changefeed].[feed:myservice.TestLoadOutbox];
truncate table [changefeed].[outbox:myservice.TestLoadOutbox];
truncate table [changefeed].[state:myservice.TestLoadOutbox];
`)
require.NoError(t, err)

var writerErr, readerErr error
var wg sync.WaitGroup
wg.Add(tc.writerCount)

// Run writerCount threads in parallel to write new events

writeThread := func(ithread int) {
defer wg.Done()
for j := 0; j != tc.eventCountPerThread; j++ {
_, err := fixture.UserDB.ExecContext(context.TODO(), `
insert into [changefeed].[outbox:myservice.TestLoadOutbox] (shard_id, time_hint, AggregateID, Version)
values (0, sysutcdatetime(), @p1, @p2);`, ithread, j)
if err != nil {
writerErr = err
return
}
}
}
for i := 0; i != tc.writerCount; i++ {
if tc.writeInParallel {
go writeThread(i)
} else {
writeThread(i)
}
}

if tc.readAfterWrite {
wg.Wait()
wg = sync.WaitGroup{}
}

readThread := func(ithread int) {
defer wg.Done()

type Row struct {
ULID string
AggregateID int
Version int
}

events := make([][]int, tc.writerCount)
var cursor string
cursor = "0x00"

done := 0
sameCursorCount := 0
for done < tc.writerCount {
if readerErr != nil {
return
}

previousCursor := cursor

rows, err := sqltest.StructSlice2[Row](context.TODO(), fixture.ReadUserDB, `
create table #read (
ulid binary(16) not null,
AggregateID bigint not null,
Version int not null
);
declare @cursorBytes binary(16) = convert(binary(16), @cursor, 1)
exec [changefeed].[read_feed:myservice.TestLoadOutbox] 0, @cursorBytes, @pagesize = 10;
select convert(varchar(max), ulid, 1) as ULID, AggregateID, Version from #read order by ulid;
`, sql.Named("cursor", cursor))
if err != nil {
readerErr = err
return
}
for _, r := range rows {
events[r.AggregateID] = append(events[r.AggregateID], r.Version)
if len(events[r.AggregateID]) > 1 {
lst := events[r.AggregateID]
if lst[len(lst)-1] != lst[len(lst)-2]+1 {
readerErr = fmt.Errorf("%03d: did not consume events in order", ithread)
fmt.Printf("==== ERROR in %03d: Not consuming events in order, got %d after %d, cursor is %s\n",
ithread,
lst[len(lst)-1],
lst[len(lst)-2],
cursor)
return
}
}
if len(events[r.AggregateID]) == tc.eventCountPerThread {
done++
}
cursor = r.ULID
}

if previousCursor == cursor {
// no progress... are we waiting for writers or are we truly done?
feedSize := sqltest.QueryInt(fixture.AdminDB, `select count(*) from changefeed.[feed:myservice.TestLoadOutbox]`)
if feedSize == tc.writerCount*tc.eventCountPerThread {
sameCursorCount++
if sameCursorCount > 10 {
readerErr = fmt.Errorf("%03d: stuck at end of feed without having consumed all events", ithread)
fmt.Println(events[0])
return
}
}
} else {
sameCursorCount = 0
}

// Useful debugging code:

outboxSize := sqltest.QueryInt(fixture.AdminDB, `select count(*) from changefeed.[outbox:myservice.TestLoadOutbox]`)
feedSize := sqltest.QueryInt(fixture.AdminDB, `select count(*) from changefeed.[feed:myservice.TestLoadOutbox]`)
maxUlid := sqltest.QueryString(fixture.AdminDB, `select convert(varchar(max), max(ulid), 1) from changefeed.[feed:myservice.TestLoadOutbox]`)
s := ""
for i := 0; i != tc.writerCount; i++ {
s = s + " " + fmt.Sprintf("%d", len(events[i]))
}
fmt.Printf("%03d: %s (cursor=%s, outbox=%d, feed=%d, maxulid=%s)\n", ithread, s, cursor, outboxSize, feedSize, maxUlid)

}

// Once everything has been consumed, assert that all events are here and in order...
for i := 0; i != tc.writerCount; i++ {
if len(events[i]) != tc.eventCountPerThread {
readerErr = errors.Errorf("wrong count in events[i]")
return
}
for j := 0; j != tc.eventCountPerThread; j++ {
if events[i][j] != j {
readerErr = errors.Errorf("events[i][j] != j")
}
}
}
}

// Run readerCount readers. They read all events, bins them by AggregateID, and checks order within each AggregateID
wg.Add(tc.readerCount)
for i := 0; i != tc.readerCount; i++ {
if tc.readInParallel {
go readThread(i)
} else {
readThread(i)
}
}

wg.Wait()

require.NoError(t, writerErr)
require.NoError(t, readerErr)

}

func TestLoadOutbox(t *testing.T) {
_, err := fixture.AdminDB.ExecContext(context.Background(), `
exec [changefeed].setup_feed 'myservice.TestLoadOutbox', @outbox = 1;
alter role [changefeed.writers:myservice.TestLoadOutbox] add member myuser;
alter role [changefeed.readers:myservice.TestLoadOutbox] add member myreaduser;
`)
require.NoError(t, err)

for _, tc := range []outboxLoadTest{
{
description: "small fully parallel test",
writerCount: 10,
readerCount: 5,
eventCountPerThread: 100,
writeInParallel: true,
readInParallel: true,
readAfterWrite: false,
},
{
description: "read in parallel after single bit serial write is done (focus on readers)",
writerCount: 1,
readerCount: 20,
eventCountPerThread: 1000,
writeInParallel: false,
readInParallel: true,
readAfterWrite: true,
},
{
description: "heavy fully parallel test",
writerCount: 10,
readerCount: 10,
eventCountPerThread: 200,
writeInParallel: true,
readInParallel: true,
readAfterWrite: false,
},
} {
t.Run(tc.description, tc.Run)
}

}
7 changes: 7 additions & 0 deletions go/changefeed/testdata/mytable.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ create table myservice.TestHappyDay (
primary key (AggregateID, Version)
);

create table myservice.TestLoadOutbox (
AggregateID bigint not null,
Version int not null,
Data varchar(max) not null,
primary key (AggregateID, Version)
);

create table myservice.MyTable (
MyAggregateID bigint not null,
Version int not null,
Expand Down
52 changes: 32 additions & 20 deletions migrations/2001.changefeed-v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -366,22 +366,25 @@ as begin
delete from #read;
-- Fast path if you are not on the head, do a 1st attempt without locks.
insert into #read(ulid, ', @pklist,')
select top(@pagesize)
ulid,
', @pklist, '
from ', @feed_table, '
where
shard_id = @shard_id
and ulid > @cursor;
and ulid > @cursor
order by ulid;
if @@rowcount <> 0
begin
return;
end
-- Read to the current end of the feed; check the Outbox. If we read something
-- we put it into the log, so enter transaction.
-- we put it into the log, so enter transaction and get a lock.
set transaction isolation level read committed;
begin transaction
-- Use an application lock to make sure only one session will
Expand All @@ -395,30 +398,33 @@ as begin
@shard_id = @shard_id,
@lock_timeout = -1,
@lock_result = @lock_result output;
if @lock_result = 1
begin
rollback
-- 1 means "got lock after timeout". This means someone else has processed the outbox;
-- so we try again once. If there are no results here, it just means the outbox was empty
-- just now anyway .. the caller should sleep a bit before retrying.
insert into #read(ulid, ', @pklist,')
select top(@pagesize)
ulid,
', @pklist, '
from ', @feed_table, '
where
shard_id = @shard_id
and ulid > @cursor;
return
end;
if @lock_result < 0
begin
throw 77100, ''Error getting lock'', 1;
end;
-- @lock_result = 0; got lock without waiting. Consume outbox.
-- At this point it does not matter if we got the lock without waiting or not, in BOTH
-- cases it could be the case that new data is now available in the feed at some point
-- after our initila `select` above. So, we need to re-do the select while holding the
-- lock to ensure we really are at the head.
insert into #read(ulid, ', @pklist,')
select top(@pagesize)
ulid,
', @pklist, '
from ', @feed_table, '
where
shard_id = @shard_id
and ulid > @cursor
order by ulid;
if @@rowcount > 0
begin
-- OK we raced another process that processed the outbox, so return the page that process processed
rollback
return
end;
declare @takenFromOutbox as table (
order_sequence bigint not null primary key,
Expand Down Expand Up @@ -505,6 +511,12 @@ as begin
ulid_high = iif(
-- embed max(time_hint, @previous_time) in ulid_high
@previous_time is null or taken.time_hint > @previous_time,
-- We do not use @next_ulid_high; because that will be based on max(time_hint).
-- Instead we wish to use the actual time_hint; those are safe to use since:
-- a) We patch them above to be in the order of order_sequence.
-- b) We only do this if they are larger than @previous_time; otherwise we use the previous
-- counter values..
convert(binary(6), datediff_big(millisecond, ''1970-01-01 00:00:00'', taken.time_hint)),
@previous_ulid_high),
ulid_low = iif(
Expand Down

0 comments on commit bc5683d

Please sign in to comment.