From f560d00a424e9241418ee3018b2f0abe2d105ec9 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 18 Jul 2023 16:30:11 -0400 Subject: [PATCH 1/4] Add (failing) unit test for bug Signed-off-by: Matt Lord --- go/vt/vtgate/endtoend/vstream_test.go | 101 ++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 03c2e86ff14..22869bea5ad 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -666,6 +666,107 @@ func TestVStreamSharded(t *testing.T) { } +// TestVStreamCopyTransaction tests that we are properly wrapping +// row events in the stream with BEGIN and COMMIT events. +func TestVStreamCopyTransactions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + keyspace := "ks" + shards := []string{"-80", "80-"} + table := "t1_copy_basic" + beginEventSeen, commitEventSeen := false, false + numResultInTrx := 0 + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: keyspace, + Shard: shards[0], + Gtid: "", // Start a vstream copy + }, + { + Keyspace: keyspace, + Shard: shards[1], + Gtid: "", // Start a vstream copy + }, + }, + } + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }}, + } + + gconn, conn, _, closeConnections := initialize(ctx, t) + defer closeConnections() + + // Generate some test data. + for i := 1; i <= 100000; i++ { + values := fmt.Sprintf("(%d, %d)", i, i) + q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values) + _, err := conn.ExecuteFetch(q, 1, false) + require.NoError(t, err, "error inserting data: %v", err) + } + + // Start a vstream. + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil) + require.NoError(t, err, "error starting vstream") + +recvLoop: + for { + e, err := reader.Recv() + numResultInTrx++ + eventCount := len(e) + t.Logf("----------------------------Received %d events in response #%d for the transaction --------------------------------------\n", eventCount, numResultInTrx) + switch err { + case nil: + for _, event := range e { + switch event.Type { + case binlogdatapb.VEventType_BEGIN: + beginEventSeen = true + t.Logf("Found BEGIN event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx) + require.False(t, commitEventSeen, "Received COMMIT event before receiving BEGIN event: numResultInTrx=%v\n", numResultInTrx) + case binlogdatapb.VEventType_VGTID: + t.Logf("Found VGTID event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + case binlogdatapb.VEventType_FIELD: + t.Logf("Found FIELD event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + case binlogdatapb.VEventType_ROW: + // Uncomment if you need to do more debugging. + //t.Logf("Found ROW event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + case binlogdatapb.VEventType_COMMIT: + commitEventSeen = true + t.Logf("Found COMMIT event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + require.True(t, beginEventSeen, "Received COMMIT event before receiving BEGIN event: numResultInTrx=%v\n", numResultInTrx) + case binlogdatapb.VEventType_COPY_COMPLETED: + t.Logf("Finished vstream copy\n") + t.Logf("-------------------------------------------------------------------\n\n") + cancel() + break recvLoop + default: + t.Logf("Found extraneous event: %+v\n", event) + } + if beginEventSeen && commitEventSeen { + t.Logf("Received both BEGIN and COMMIT, so resetting transactional state\n") + beginEventSeen = false + commitEventSeen = false + numResultInTrx = 0 + } + } + case io.EOF: + t.Logf("vstream ended\n") + t.Logf("-------------------------------------------------------------------\n\n") + cancel() + return + default: + require.FailNowf(t, "unexpected error", "encountered error in vstream: %v", err) + return + } + } + if beginEventSeen || commitEventSeen { + require.True(t, (beginEventSeen && commitEventSeen), "Did not receive both BEGIN and COMMIT in last transaction") + } +} + func removeAnyDeprecatedDisplayWidths(orig string) string { var adjusted string baseIntType := "int" From 561a9495f1d8930c8702850f7ae020336774a9ab Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 18 Jul 2023 19:57:10 -0400 Subject: [PATCH 2/4] Ensure that ROW events are sent within a trx Signed-off-by: Matt Lord --- go/vt/vtgate/endtoend/vstream_test.go | 8 +++++++- go/vt/vttablet/tabletserver/vstreamer/copy.go | 16 +++++++++++++++ .../tabletserver/vstreamer/uvstreamer.go | 20 +++++++++++-------- 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 22869bea5ad..f3db014a07a 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -700,7 +700,13 @@ func TestVStreamCopyTransactions(t *testing.T) { gconn, conn, _, closeConnections := initialize(ctx, t) defer closeConnections() - // Generate some test data. + // Clear any existing data. + q := fmt.Sprintf("delete from %s", table) + _, err := conn.ExecuteFetch(q, -1, false) + require.NoError(t, err, "error clearing data: %v", err) + + // Generate some test data. Enough to cross the vstream_packet_size + // threshold. for i := 1; i <= 100000; i++ { values := fmt.Sprintf("(%d, %d)", i, i) q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values) diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index bc84b1e57ed..44f1f82219f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -255,12 +255,26 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { log.Infof("sendFieldEvent returned error %v", err) return err } + // sendFieldEvent sends a BEGIN event first. + uvs.inTransaction = true } + if len(rows.Rows) == 0 { log.V(2).Infof("0 rows returned for table %s", tableName) return nil } + // We are about to send ROW events, so we need to ensure + // that we do so within a transaction. The COMMIT event + // will be sent in sendEventsForRows() below. + if !uvs.inTransaction { + evs := []*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_BEGIN, + }} + uvs.send(evs) + uvs.inTransaction = true + } + newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{ Fields: uvs.pkfields, Rows: []*querypb.Row{rows.Lastpk}, @@ -271,6 +285,8 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { log.Infof("sendEventsForRows returned error %v", err) return err } + // sendEventsForRows sends a COMMIT event last. + uvs.inTransaction = false uvs.setCopyState(tableName, qrLastPK) log.V(2).Infof("NewLastPK: %v", qrLastPK) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 0eec8d93db3..a1ea07a92fa 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -52,14 +52,18 @@ type uvstreamer struct { cancel func() // input parameters - vse *Engine - send func([]*binlogdatapb.VEvent) error - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - inTablePKs []*binlogdatapb.TableLastPK - throttlerApp throttlerapp.Name + vse *Engine + send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + // Are we currently in an explicit transaction? + // If we are not, and we're about to send ROW + // events, then we need to send a BEGIN event first. + inTransaction bool + filter *binlogdatapb.Filter + inTablePKs []*binlogdatapb.TableLastPK + throttlerApp throttlerapp.Name vschema *localVSchema From 184b0315322825af1f23dcb1be3a42e5a86edeae Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 19 Jul 2023 10:39:20 -0400 Subject: [PATCH 3/4] Minor tweaks after self review Signed-off-by: Matt Lord --- go/vt/vtgate/endtoend/vstream_test.go | 34 +++++++++++++------ go/vt/vttablet/tabletserver/vstreamer/copy.go | 4 +-- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index f3db014a07a..5149378d7e0 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -666,7 +666,7 @@ func TestVStreamSharded(t *testing.T) { } -// TestVStreamCopyTransaction tests that we are properly wrapping +// TestVStreamCopyTransactions tests that we are properly wrapping // row events in the stream with BEGIN and COMMIT events. func TestVStreamCopyTransactions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -705,8 +705,8 @@ func TestVStreamCopyTransactions(t *testing.T) { _, err := conn.ExecuteFetch(q, -1, false) require.NoError(t, err, "error clearing data: %v", err) - // Generate some test data. Enough to cross the vstream_packet_size - // threshold. + // Generate some test data. Enough to cross the default + // vstream_packet_size threshold. for i := 1; i <= 100000; i++ { values := fmt.Sprintf("(%d, %d)", i, i) q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values) @@ -723,26 +723,36 @@ recvLoop: e, err := reader.Recv() numResultInTrx++ eventCount := len(e) - t.Logf("----------------------------Received %d events in response #%d for the transaction --------------------------------------\n", eventCount, numResultInTrx) + t.Logf("------------------ Received %d events in response #%d for the transaction ------------------\n", + eventCount, numResultInTrx) switch err { case nil: for _, event := range e { switch event.Type { case binlogdatapb.VEventType_BEGIN: + require.False(t, beginEventSeen, "Received a second BEGIN event within the transaction: numResultInTrx=%v\n", + numResultInTrx) beginEventSeen = true - t.Logf("Found BEGIN event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx) - require.False(t, commitEventSeen, "Received COMMIT event before receiving BEGIN event: numResultInTrx=%v\n", numResultInTrx) + t.Logf("Found BEGIN event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx) + require.False(t, commitEventSeen, "Received a BEGIN event when expecting a COMMIT event: numResultInTrx=%v\n", + numResultInTrx) case binlogdatapb.VEventType_VGTID: - t.Logf("Found VGTID event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + t.Logf("Found VGTID event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) case binlogdatapb.VEventType_FIELD: - t.Logf("Found FIELD event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + t.Logf("Found FIELD event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) case binlogdatapb.VEventType_ROW: // Uncomment if you need to do more debugging. - //t.Logf("Found ROW event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + // t.Logf("Found ROW event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", + // beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) case binlogdatapb.VEventType_COMMIT: commitEventSeen = true - t.Logf("Found COMMIT event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) - require.True(t, beginEventSeen, "Received COMMIT event before receiving BEGIN event: numResultInTrx=%v\n", numResultInTrx) + t.Logf("Found COMMIT event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + require.True(t, beginEventSeen, "Received COMMIT event before receiving BEGIN event: numResultInTrx=%v\n", + numResultInTrx) case binlogdatapb.VEventType_COPY_COMPLETED: t.Logf("Finished vstream copy\n") t.Logf("-------------------------------------------------------------------\n\n") @@ -768,6 +778,8 @@ recvLoop: return } } + // The last response, when the vstream copy completes, does not + // typically contain ROW events. if beginEventSeen || commitEventSeen { require.True(t, (beginEventSeen && commitEventSeen), "Did not receive both BEGIN and COMMIT in last transaction") } diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 44f1f82219f..06e90688482 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -255,7 +255,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { log.Infof("sendFieldEvent returned error %v", err) return err } - // sendFieldEvent sends a BEGIN event first. + // sendFieldEvent() sends a BEGIN event first. uvs.inTransaction = true } @@ -285,7 +285,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { log.Infof("sendEventsForRows returned error %v", err) return err } - // sendEventsForRows sends a COMMIT event last. + // sendEventsForRows() sends a COMMIT event last. uvs.inTransaction = false uvs.setCopyState(tableName, qrLastPK) From ce8f653e88b3c463202bd11f90e52f47a8be5b4c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 20 Jul 2023 10:45:41 -0400 Subject: [PATCH 4/4] Minor tweaks from final self review Signed-off-by: Matt Lord --- go/vt/vtgate/endtoend/vstream_test.go | 28 +++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index d72f64e9b6d..73086137242 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -668,7 +668,7 @@ func TestVStreamSharded(t *testing.T) { } // TestVStreamCopyTransactions tests that we are properly wrapping -// row events in the stream with BEGIN and COMMIT events. +// ROW events in the stream with BEGIN and COMMIT events. func TestVStreamCopyTransactions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -717,42 +717,42 @@ func TestVStreamCopyTransactions(t *testing.T) { // Start a vstream. reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil) - require.NoError(t, err, "error starting vstream") + require.NoError(t, err, "error starting vstream: %v", err) recvLoop: for { - e, err := reader.Recv() + vevents, err := reader.Recv() numResultInTrx++ - eventCount := len(e) + eventCount := len(vevents) t.Logf("------------------ Received %d events in response #%d for the transaction ------------------\n", eventCount, numResultInTrx) switch err { case nil: - for _, event := range e { + for _, event := range vevents { switch event.Type { case binlogdatapb.VEventType_BEGIN: - require.False(t, beginEventSeen, "Received a second BEGIN event within the transaction: numResultInTrx=%v\n", + require.False(t, beginEventSeen, "received a second BEGIN event within the transaction: numResultInTrx=%d\n", numResultInTrx) beginEventSeen = true - t.Logf("Found BEGIN event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v\n", + t.Logf("Found BEGIN event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx) - require.False(t, commitEventSeen, "Received a BEGIN event when expecting a COMMIT event: numResultInTrx=%v\n", + require.False(t, commitEventSeen, "received a BEGIN event when expecting a COMMIT event: numResultInTrx=%d\n", numResultInTrx) case binlogdatapb.VEventType_VGTID: - t.Logf("Found VGTID event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", + t.Logf("Found VGTID event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) case binlogdatapb.VEventType_FIELD: - t.Logf("Found FIELD event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", + t.Logf("Found FIELD event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) case binlogdatapb.VEventType_ROW: // Uncomment if you need to do more debugging. - // t.Logf("Found ROW event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", + // t.Logf("Found ROW event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", // beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) case binlogdatapb.VEventType_COMMIT: commitEventSeen = true - t.Logf("Found COMMIT event, beginEventSeen=%v, commitEventSeen=%v, eventType=%v, numResultInTrx=%v, event=%+v\n", + t.Logf("Found COMMIT event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) - require.True(t, beginEventSeen, "Received COMMIT event before receiving BEGIN event: numResultInTrx=%v\n", + require.True(t, beginEventSeen, "received COMMIT event before receiving BEGIN event: numResultInTrx=%d\n", numResultInTrx) case binlogdatapb.VEventType_COPY_COMPLETED: t.Logf("Finished vstream copy\n") @@ -782,7 +782,7 @@ recvLoop: // The last response, when the vstream copy completes, does not // typically contain ROW events. if beginEventSeen || commitEventSeen { - require.True(t, (beginEventSeen && commitEventSeen), "Did not receive both BEGIN and COMMIT in last transaction") + require.True(t, (beginEventSeen && commitEventSeen), "did not receive both BEGIN and COMMIT events in the final ROW event set") } }